You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/10/22 21:04:00 UTC

svn commit: r1401008 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/metrics/ main/java/org/apache/hadoop/hbase/util/ test/java/or...

Author: tedyu
Date: Mon Oct 22 19:03:59 2012
New Revision: 1401008

URL: http://svn.apache.org/viewvc?rev=1401008&view=rev
Log:
HBASE-6728 prevent OOM possibility due to per connection responseQueue being unbounded


Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SizeBasedThrottler.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSizeBasedThrottler.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1401008&r1=1401007&r2=1401008&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Mon Oct 22 19:03:59 2012
@@ -26,6 +26,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -52,12 +53,12 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
@@ -68,28 +69,29 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcException;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcResponseHeader.Status;
-import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.monitoring.TaskMonitor;
-import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.AuthMethod;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslStatus;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.util.SizeBasedThrottler;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -97,20 +99,19 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.StringUtils;
-
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.Message;
-
 import org.cliffc.high_scale_lib.Counter;
 import org.cloudera.htrace.Sampler;
 import org.cloudera.htrace.Span;
+import org.cloudera.htrace.Trace;
 import org.cloudera.htrace.TraceInfo;
 import org.cloudera.htrace.impl.NullSpan;
-import org.cloudera.htrace.Trace;
+
+import com.google.common.base.Function;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Message;
 
 /** A client for an IPC service.  IPC calls take a single Protobuf message as a
  * parameter, and return a single Protobuf message as their value.  A service runs on
@@ -256,6 +257,14 @@ public abstract class HBaseServer implem
   protected final boolean tcpKeepAlive; // if T then use keepalives
   protected final long purgeTimeout;    // in milliseconds
 
+  // responseQueuesSizeThrottler is shared among all responseQueues,
+  // it bounds memory occupied by responses in all responseQueues
+  final SizeBasedThrottler responseQueuesSizeThrottler;
+
+  // RESPONSE_QUEUE_MAX_SIZE limits total size of responses in every response queue
+  private static final long DEFAULT_RESPONSE_QUEUES_MAX_SIZE = 1024 * 1024 * 1024; // 1G
+  private static final String RESPONSE_QUEUES_MAX_SIZE = "ipc.server.response.queue.maxsize";
+
   volatile protected boolean running = true;         // true while server runs
   protected BlockingQueue<Call> callQueue; // queued calls
   protected final Counter callQueueSize = new Counter();
@@ -987,7 +996,7 @@ public abstract class HBaseServer implem
           //
           // Extract the first call
           //
-          call = responseQueue.removeFirst();
+          call = responseQueue.peek();
           SocketChannel channel = call.connection.channel;
           if (LOG.isDebugEnabled()) {
             LOG.debug(getName() + ": responding to #" + call.id + " from " +
@@ -998,9 +1007,13 @@ public abstract class HBaseServer implem
           //
           int numBytes = channelWrite(channel, call.response);
           if (numBytes < 0) {
+            // Error flag is set, so returning here closes connection and
+            // clears responseQueue.            
             return true;
           }
           if (!call.response.hasRemaining()) {
+            responseQueue.poll();
+            responseQueuesSizeThrottler.decrease(call.response.limit());            
             responseQueueLen--;
             call.connection.decRpcCount();
             //noinspection RedundantIfStatement
@@ -1014,12 +1027,6 @@ public abstract class HBaseServer implem
                         call.connection + " Wrote " + numBytes + " bytes.");
             }
           } else {
-            //
-            // If we were unable to write the entire response out, then
-            // insert in Selector queue.
-            //
-            call.connection.responseQueue.addFirst(call);
-
             if (inHandler) {
               // set the serve time when the response has to be sent later
               call.timestamp = System.currentTimeMillis();
@@ -1074,15 +1081,31 @@ public abstract class HBaseServer implem
       responseQueueLen++;
 
       boolean doRegister = false;
+      boolean closed;
+      try {
+        responseQueuesSizeThrottler.increase(call.response.remaining());
+      } catch (InterruptedException ie) {
+        throw new InterruptedIOException(ie.getMessage());
+      }
       synchronized (call.connection.responseQueue) {
-        call.connection.responseQueue.addLast(call);
-        if (call.connection.responseQueue.size() == 1) {
-          doRegister = !processResponse(call.connection.responseQueue, false);
+        closed = call.connection.closed;
+        if (!closed) {
+          call.connection.responseQueue.addLast(call);
+
+          if (call.connection.responseQueue.size() == 1) {
+            doRegister = !processResponse(call.connection.responseQueue, false);
+          }
         }
       }
       if (doRegister) {
         enqueueInSelector(call);
       }
+      if (closed) {
+        // Connection was closed when we tried to submit response, but we
+        // increased responseQueues size already. It shoud be
+        // decreased here.
+        responseQueuesSizeThrottler.decrease(call.response.remaining());
+      }      
     }
 
     private synchronized void incPending() {   // call waiting to be enqueued.
@@ -1107,6 +1130,8 @@ public abstract class HBaseServer implem
                                          //version are read
     private boolean headerRead = false;  //if the connection header that
                                          //follows version is read.
+
+    protected volatile boolean closed = false;    // indicates if connection was closed
     protected SocketChannel channel;
     private ByteBuffer data;
     private ByteBuffer dataLengthBuffer;
@@ -1691,6 +1716,7 @@ public abstract class HBaseServer implem
     }
 
     protected synchronized void close() {
+      closed = true;
       disposeSasl();
       data = null;
       dataLengthBuffer = null;
@@ -1946,6 +1972,9 @@ public abstract class HBaseServer implem
     this.delayedCalls = new AtomicInteger(0);
 
 
+    this.responseQueuesSizeThrottler = new SizeBasedThrottler(
+        conf.getLong(RESPONSE_QUEUES_MAX_SIZE, DEFAULT_RESPONSE_QUEUES_MAX_SIZE));
+
     // Create the responder here
     responder = new Responder();
     this.authorize =
@@ -1990,6 +2019,14 @@ public abstract class HBaseServer implem
       }
     }
     connection.close();
+    long bytes = 0;
+    synchronized (connection.responseQueue) {
+      for (Call c : connection.responseQueue) {
+        bytes += c.response.limit();
+      }
+      connection.responseQueue.clear();
+    }
+    responseQueuesSizeThrottler.decrease(bytes);    
     rpcMetrics.numOpenConnections.set(numConnections);
   }
 
@@ -2244,4 +2281,8 @@ public abstract class HBaseServer implem
   public static RpcCallContext getCurrentCall() {
     return CurCall.get();
   }
+
+  public long getResponseQueueSize(){
+    return responseQueuesSizeThrottler.getCurrentValue();
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1401008&r1=1401007&r2=1401008&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Mon Oct 22 19:03:59 2012
@@ -53,7 +53,6 @@ import java.util.concurrent.locks.Reentr
 
 import javax.management.ObjectName;
 
-import com.google.protobuf.Message;
 import org.apache.commons.lang.mutable.MutableDouble;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -103,8 +102,8 @@ import org.apache.hadoop.hbase.client.co
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -113,6 +112,7 @@ import org.apache.hadoop.hbase.ipc.Copro
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
 import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics;
+import org.apache.hadoop.hbase.ipc.HBaseServer;
 import org.apache.hadoop.hbase.ipc.ProtocolSignature;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
@@ -152,6 +152,8 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
@@ -196,8 +198,8 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.StoreMetricType;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
+import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -228,12 +230,10 @@ import org.codehaus.jackson.map.ObjectMa
 
 import com.google.common.base.Function;
 import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
-import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
-import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
-
 /**
  * HRegionServer makes a set of HRegions available to clients. It checks in with
  * the HMaster. There are many HRegionServers in a single HBase deployment.
@@ -262,6 +262,9 @@ public class  HRegionServer implements C
 
   protected long maxScannerResultSize;
 
+  // Server to handle client requests.
+  private HBaseServer server;
+  
   // Cache flushing
   protected MemStoreFlusher cacheFlusher;
 
@@ -518,6 +521,7 @@ public class  HRegionServer implements C
         conf.getInt("hbase.regionserver.metahandler.count", 10),
         conf.getBoolean("hbase.rpc.verbose", false),
         conf, HConstants.QOS_THRESHOLD);
+    if (rpcServer instanceof HBaseServer) server = (HBaseServer) rpcServer;
     // Set our address.
     this.isa = this.rpcServer.getListenerAddress();
 
@@ -1197,7 +1201,7 @@ public class  HRegionServer implements C
       this.hlog = setupWALAndReplication();
       // Init in here rather than in constructor after thread name has been set
       this.metrics = new RegionServerMetrics();
-      this.dynamicMetrics = RegionServerDynamicMetrics.newInstance();
+      this.dynamicMetrics = RegionServerDynamicMetrics.newInstance(this);
       startServiceThreads();
       LOG.info("Serving as " + this.serverNameFromMasterPOV +
         ", RPC listening on " + this.isa +
@@ -4131,4 +4135,11 @@ public class  HRegionServer implements C
   private String getMyEphemeralNodePath() {
     return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
   }
+  
+  public long getResponseQueueSize(){
+    if (server != null) {
+      return server.getResponseQueueSize();
+    }
+    return 0;
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java?rev=1401008&r1=1401007&r2=1401008&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java Mon Oct 22 19:03:59 2012
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
@@ -59,6 +60,7 @@ public class RegionServerDynamicMetrics 
   private MetricsContext context;
   private final RegionServerDynamicStatistics rsDynamicStatistics;
   private Method updateMbeanInfoIfMetricsListChanged = null;
+  private HRegionServer regionServer;
   private static final Log LOG =
     LogFactory.getLog(RegionServerDynamicStatistics.class);
   
@@ -74,13 +76,14 @@ public class RegionServerDynamicMetrics 
    */
   public final MetricsRegistry registry = new MetricsRegistry();
 
-  private RegionServerDynamicMetrics() {
+  private RegionServerDynamicMetrics(HRegionServer regionServer) {
     this.context = MetricsUtil.getContext("hbase-dynamic");
     this.metricsRecord = MetricsUtil.createRecord(
                             this.context,
                             "RegionServerDynamicStatistics");
     context.registerUpdater(this);
     this.rsDynamicStatistics = new RegionServerDynamicStatistics(this.registry);
+    this.regionServer = regionServer;
     try {
       updateMbeanInfoIfMetricsListChanged =
         this.rsDynamicStatistics.getClass().getSuperclass()
@@ -92,9 +95,9 @@ public class RegionServerDynamicMetrics 
     }
   }
 
-  public static RegionServerDynamicMetrics newInstance() {
+  public static RegionServerDynamicMetrics newInstance(HRegionServer regionServer) {
     RegionServerDynamicMetrics metrics =
-      new RegionServerDynamicMetrics();
+      new RegionServerDynamicMetrics(regionServer);
     return metrics;
   }
 
@@ -184,6 +187,13 @@ public class RegionServerDynamicMetrics 
     for (Entry<String, AtomicLong> entry : RegionMetricsStorage.getNumericMetrics().entrySet()) {
       this.setNumericMetric(entry.getKey(), entry.getValue().getAndSet(0));
     }
+
+    /* export estimated size of all response queues */
+    if (regionServer != null) {
+      long responseQueueSize = regionServer.getResponseQueueSize();
+      this.setNumericMetric("responseQueuesSize", responseQueueSize);
+    }
+
     /* get dynamically created numeric metrics, and push the metrics.
      * These ones aren't to be reset; they are cumulative. */
     for (Entry<String, AtomicLong> entry : RegionMetricsStorage.getNumericPersistentMetrics().entrySet()) {

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SizeBasedThrottler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SizeBasedThrottler.java?rev=1401008&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SizeBasedThrottler.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/SizeBasedThrottler.java Mon Oct 22 19:03:59 2012
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Utility class that can be used to implement
+ * queues with limited capacity (in terms of memory).
+ * It maintains internal counter and provides
+ * two operations: increase and decrease.
+ * Increase blocks until internal counter is lower than
+ * given threshold and then increases internal counter.
+ * Decrease decreases internal counter and wakes up
+ * waiting threads if counter is lower than threshold.
+ *
+ * This implementation allows you to set the value of internal
+ * counter to be greater than threshold. It happens
+ * when internal counter is lower than threshold and
+ * increase method is called with parameter 'delta' big enough
+ * so that sum of delta and internal counter is greater than
+ * threshold. This is not a bug, this is a feature.
+ * It solves some problems:
+ *   - thread calling increase with big parameter will not be
+ *     starved by other threads calling increase with small
+ *     arguments.
+ *   - thread calling increase with argument greater than
+ *     threshold won't deadlock. This is useful when throttling
+ *     queues - you can submit object that is bigger than limit.
+ *
+ * This implementation introduces small costs in terms of
+ * synchronization (no synchronization in most cases at all), but is
+ * vulnerable to races. For details see documentation of
+ * increase method.
+ */
+@InterfaceAudience.Private
+public class SizeBasedThrottler {
+
+  private final long threshold;
+  private final AtomicLong currentSize;
+
+  /**
+   * Creates SizeBoundary with provided threshold
+   *
+   * @param threshold threshold used by instance
+   */
+  public SizeBasedThrottler(long threshold) {
+    if (threshold <= 0) {
+      throw new IllegalArgumentException("Treshold must be greater than 0");
+    }
+    this.threshold = threshold;
+    this.currentSize = new AtomicLong(0);
+  }
+
+  /**
+   * Blocks until internal counter is lower than threshold
+   * and then increases value of internal counter.
+   *
+   * THIS METHOD IS VULNERABLE TO RACES.
+   * It may happen that increment operation will
+   * succeed immediately, even if it should block. This happens when
+   * at least two threads call increase at the some moment. The decision
+   * whether to block is made at the beginning, without synchronization.
+   * If value of currentSize is lower than threshold at that time, call
+   * will succeed immediately. It is possible, that 2 threads will make
+   * decision not to block, even if one of them should block.
+   *
+   * @param delta increase internal counter by this value
+   * @return new value of internal counter
+   * @throws InterruptedException when interrupted during waiting
+   */
+  public synchronized long increase(long delta) throws InterruptedException{
+    if (currentSize.get() >= threshold) {
+      synchronized (this) {
+        while (currentSize.get() >= threshold) {
+          wait();
+        }
+      }
+    }
+
+    return currentSize.addAndGet(delta);
+  }
+
+
+  /**
+   * Decreases value of internal counter. Wakes up waiting threads if required.
+   *
+   * @param delta decrease internal counter by this value
+   * @return new value of internal counter
+   */
+  public synchronized long decrease(long delta) {
+    final long newSize = currentSize.addAndGet(-delta);
+
+    if (newSize < threshold && newSize + delta >= threshold) {
+      synchronized (this) {
+        notifyAll();
+      }
+    }
+
+    return newSize;
+  }
+
+  /**
+   *
+   * @return current value of internal counter
+   */
+  public synchronized long getCurrentValue(){
+    return currentSize.get();
+  }
+
+  /**
+   * @return threshold
+   */
+  public long getThreshold(){
+    return threshold;
+  }
+}

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSizeBasedThrottler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSizeBasedThrottler.java?rev=1401008&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSizeBasedThrottler.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSizeBasedThrottler.java Mon Oct 22 19:03:59 2012
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hbase.MediumTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * This tests some race conditions that can happen
+ * occasionally, but not every time.
+ */
+@Category(MediumTests.class)
+public class TestSizeBasedThrottler {
+
+  private static final int REPEATS = 100;
+
+  private Thread makeThread(final SizeBasedThrottler throttler,
+      final AtomicBoolean failed, final int delta,
+      final int limit, final CountDownLatch latch) {
+
+    Thread ret = new Thread(new Runnable() {
+
+      @Override
+      public void run() {
+        try {
+          latch.await();
+          if (throttler.increase(delta) > limit) {
+            failed.set(true);
+          }
+          throttler.decrease(delta);
+        } catch (Exception e) {
+          failed.set(true);
+        }
+      }
+    });
+
+    ret.start();
+    return ret;
+  }
+
+  private void runGenericTest(int threshold, int delta, int maxValueAllowed,
+      int numberOfThreads, long timeout) {
+    SizeBasedThrottler throttler = new SizeBasedThrottler(threshold);
+    AtomicBoolean failed = new AtomicBoolean(false);
+
+    ArrayList<Thread> threads = new ArrayList<Thread>(numberOfThreads);
+    CountDownLatch latch = new CountDownLatch(1);
+    long timeElapsed = 0;
+
+    for (int i = 0; i < numberOfThreads; ++i) {
+      threads.add(makeThread(throttler, failed, delta, maxValueAllowed, latch));
+    }
+
+    latch.countDown();
+    for (Thread t : threads) {
+      try {
+        long beforeJoin = System.currentTimeMillis();
+        t.join(timeout - timeElapsed);
+        timeElapsed += System.currentTimeMillis() - beforeJoin;
+        if (t.isAlive() || timeElapsed >= timeout) {
+          fail("Timeout reached.");
+        }
+      } catch (InterruptedException e) {
+        fail("Got InterruptedException");
+      }
+    }
+
+    assertFalse(failed.get());
+  }
+
+  @Test
+  public void testSmallIncreases(){
+    for (int i = 0; i < REPEATS; ++i) {
+      runGenericTest(
+          10, // threshold
+          1,  // delta
+          15, // fail if throttler's value
+              // exceeds 15
+          1000, // use 1000 threads
+          200 // wait for 200ms
+          );
+    }
+  }
+
+  @Test
+  public void testBigIncreases() {
+    for (int i = 0; i < REPEATS; ++i) {
+      runGenericTest(
+          1, // threshold
+          2, // delta
+          4, // fail if throttler's value
+             // exceeds 4
+          1000, // use 1000 threads
+          200 // wait for 200ms
+          );
+    }
+  }
+
+  @Test
+  public void testIncreasesEqualToThreshold(){
+    for (int i = 0; i < REPEATS; ++i) {
+      runGenericTest(
+          1, // threshold
+          1, // delta
+          2, // fail if throttler's value
+             // exceeds 2
+          1000, // use 1000 threads
+          200 // wait for 200ms
+          );
+    }
+  }
+}