You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/09/15 14:50:46 UTC

svn commit: r1385058 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/metrics/ main/java/org/apache/hadoop/hbase/util/ test/java/org/...

Author: mbautin
Date: Sat Sep 15 12:50:45 2012
New Revision: 1385058

URL: http://svn.apache.org/viewvc?rev=1385058&view=rev
Log:
[jira] [HBASE-6728] [89-fb] bound responseQueue to prevent OOM

Author: michalgr

Summary:
In HBaseServer, if connection is not able to send responses as fast as they are produced, responses are queued in per connection queue. The queue is unbounded and this can cause OOM.

This patch introduces size-based boundary, a configurable limit on how much memory can be occupied by responses in all responsequeues. If this limit is exceeded handler threads have to wait to submit a response.

Test Plan: Unit tests + I will try to observe impact on latency in performance test i used to compare different HBase changes.

Reviewers: kannan

Reviewed By: kannan

CC: Karthik, mbautin, avf, Liyin, gqchen, lhofhansl

Differential Revision: https://reviews.facebook.net/D5337

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/SizeBasedThrottler.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestSizeBasedThrottler.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1385058&r1=1385057&r2=1385058&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Sat Sep 15 12:50:45 2012
@@ -59,6 +59,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.util.DaemonThreadFactory;
+import org.apache.hadoop.hbase.util.SizeBasedThrottler;
 import org.apache.hadoop.hbase.io.WritableWithSize;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
@@ -184,6 +185,14 @@ public abstract class HBaseServer {
   protected final boolean tcpNoDelay;   // if T then disable Nagle's Algorithm
   protected final boolean tcpKeepAlive; // if T then use keepalives
 
+  // responseQueuesSizeThrottler is shared among all responseQueues,
+  // it bounds memory occupied by responses in all responseQueues
+  final SizeBasedThrottler responseQueuesSizeThrottler;
+
+  // RESPONSE_QUEUE_MAX_SIZE limits total size of responses in every response queue
+  private static final long DEFAULT_RESPONSE_QUEUES_MAX_SIZE = 1024 * 1024 * 1024; // 1G
+  private static final String RESPONSE_QUEUES_MAX_SIZE = "ipc.server.response.queue.maxsize";
+
   volatile protected boolean running = true;         // true while server runs
   protected BlockingQueue<Call> callQueue; // queued calls
 
@@ -749,7 +758,7 @@ public abstract class HBaseServer {
           //
           // Extract the first call
           //
-          call = responseQueue.removeFirst();
+          call = responseQueue.peek();
           SocketChannel channel = call.connection.channel;
           if (LOG.isTraceEnabled()) {
             LOG.trace(getName() + ": responding to #" + call.id + " from " +
@@ -760,9 +769,13 @@ public abstract class HBaseServer {
           //
           int numBytes = channelWrite(channel, call.response);
           if (numBytes < 0) {
+            // Error flag is set, so returning here closes connection and
+            // clears responseQueue.
             return true;
           }
           if (!call.response.hasRemaining()) {
+            responseQueue.poll();
+            responseQueuesSizeThrottler.decrease(call.response.limit());
             call.connection.decRpcCount();
             //noinspection RedundantIfStatement
             if (numElements == 1) {    // last call fully processes.
@@ -775,12 +788,6 @@ public abstract class HBaseServer {
                         call.connection + " Wrote " + numBytes + " bytes.");
             }
           } else {
-            //
-            // If we were unable to write the entire response out, then
-            // insert in Selector queue.
-            //
-            call.connection.responseQueue.addFirst(call);
-
             if (inHandler) {
               // set the serve time when the response has to be sent later
               call.timestamp = System.currentTimeMillis();
@@ -819,13 +826,24 @@ public abstract class HBaseServer {
     //
     // Enqueue a response from the application.
     //
-    void doRespond(Call call) throws IOException {
+    void doRespond(Call call) throws IOException, InterruptedException {
+      boolean closed;
+      responseQueuesSizeThrottler.increase(call.response.remaining());
       synchronized (call.connection.responseQueue) {
-        call.connection.responseQueue.addLast(call);
-        if (call.connection.responseQueue.size() == 1) {
-          processResponse(call.connection.responseQueue, true);
+        closed = call.connection.closed;
+        if (!closed) {
+          call.connection.responseQueue.addLast(call);
+          if (call.connection.responseQueue.size() == 1) {
+            processResponse(call.connection.responseQueue, true);
+          }
         }
       }
+      if (closed) {
+        // Connection was closed when we tried to submit response, but we
+        // increased responseQueues size already. It shoud be
+        // decreased here.
+        responseQueuesSizeThrottler.decrease(call.response.remaining());
+      }
     }
 
     private synchronized void incPending() {   // call waiting to be enqueued.
@@ -851,6 +869,8 @@ public abstract class HBaseServer {
     private int version = -1;
     private boolean headerRead = false;  //if the connection header that
                                          //follows version is read.
+
+    protected volatile boolean closed = false;    // indicates if connection was closed
     protected SocketChannel channel;
     private ByteBuffer data;
     private ByteBuffer dataLengthBuffer;
@@ -1053,6 +1073,7 @@ public abstract class HBaseServer {
     }
 
     protected synchronized void close() {
+      closed = true;
       data = null;
       dataLengthBuffer = null;
       if (!channel.isOpen())
@@ -1253,6 +1274,9 @@ public abstract class HBaseServer {
     this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
     this.tcpKeepAlive = conf.getBoolean("ipc.server.tcpkeepalive", true);
 
+    this.responseQueuesSizeThrottler = new SizeBasedThrottler(
+        conf.getLong(RESPONSE_QUEUES_MAX_SIZE, DEFAULT_RESPONSE_QUEUES_MAX_SIZE));
+
     // Create the responder here
     responder = new Responder();
   }
@@ -1263,6 +1287,15 @@ public abstract class HBaseServer {
         numConnections--;
     }
     connection.close();
+
+    long bytes = 0;
+    synchronized (connection.responseQueue) {
+      for (Call c : connection.responseQueue) {
+        bytes += c.response.limit();
+      }
+      connection.responseQueue.clear();
+    }
+    responseQueuesSizeThrottler.decrease(bytes);
   }
 
   /** Sets the socket buffer size used for responding to RPCs.
@@ -1460,4 +1493,8 @@ public abstract class HBaseServer {
       selectionKey.interestOps(selectionKey.interestOps() & (~SelectionKey.OP_READ));
     }
   }
+
+  public long getResponseQueueSize(){
+    return responseQueuesSizeThrottler.getCurrentValue();
+  }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1385058&r1=1385057&r2=1385058&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat Sep 15 12:50:45 2012
@@ -979,7 +979,7 @@ public class HRegionServer implements HR
       this.hlog = setupHLog();
       // Init in here rather than in constructor after thread name has been set
       this.metrics = new RegionServerMetrics();
-      this.dynamicMetrics = RegionServerDynamicMetrics.newInstance();
+      this.dynamicMetrics = RegionServerDynamicMetrics.newInstance(this);
       startServiceThreads();
       isOnline = true;
 
@@ -3493,4 +3493,12 @@ public class HRegionServer implements HR
     }
   }
   
+  public long getResponseQueueSize(){
+    HBaseServer s = server;
+    if (s != null) {
+      return s.getResponseQueueSize();
+    }
+    return 0;
+  }
+
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java?rev=1385058&r1=1385057&r2=1385058&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerDynamicMetrics.java Sat Sep 15 12:50:45 2012
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
@@ -55,6 +56,7 @@ public class RegionServerDynamicMetrics 
   private MetricsContext context;
   private final RegionServerDynamicStatistics rsDynamicStatistics;
   private Method updateMbeanInfoIfMetricsListChanged = null;
+  private HRegionServer regionServer;
   private static final Log LOG =
     LogFactory.getLog(RegionServerDynamicStatistics.class);
 
@@ -65,12 +67,13 @@ public class RegionServerDynamicMetrics 
    */
   public final MetricsRegistry registry = new MetricsRegistry();
 
-  private RegionServerDynamicMetrics() {
+  private RegionServerDynamicMetrics(HRegionServer regionServer) {
     this.context = MetricsUtil.getContext("hbase");
     this.metricsRecord = MetricsUtil.createRecord(
                             this.context,
                             "RegionServerDynamicStatistics");
     this.rsDynamicStatistics = new RegionServerDynamicStatistics(this.registry);
+    this.regionServer = regionServer;
     try {
       updateMbeanInfoIfMetricsListChanged =
         this.rsDynamicStatistics.getClass().getSuperclass()
@@ -82,9 +85,9 @@ public class RegionServerDynamicMetrics 
     }
   }
 
-  public static RegionServerDynamicMetrics newInstance() {
+  public static RegionServerDynamicMetrics newInstance(HRegionServer regionServer) {
     RegionServerDynamicMetrics metrics =
-      new RegionServerDynamicMetrics();
+      new RegionServerDynamicMetrics(regionServer);
     metrics.context.registerUpdater(metrics);
     return metrics;
   }
@@ -135,6 +138,13 @@ public class RegionServerDynamicMetrics 
     for (Entry<String, AtomicLong> entry : HRegion.numericMetrics.entrySet()) {
       this.setNumericMetric(entry.getKey(), entry.getValue().getAndSet(0));
     }
+
+    /* export estimated size of all response queues */
+    if (regionServer != null) {
+      long responseQueueSize = regionServer.getResponseQueueSize();
+      this.setNumericMetric("responseQueuesSize", responseQueueSize);
+    }
+
     /* get dynamically created numeric metrics, and push the metrics.
      * These ones aren't to be reset; they are cumulative. */
     for (Entry<String, AtomicLong> entry : HRegion.numericPersistentMetrics.entrySet()) {

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/SizeBasedThrottler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/SizeBasedThrottler.java?rev=1385058&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/SizeBasedThrottler.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/SizeBasedThrottler.java Sat Sep 15 12:50:45 2012
@@ -0,0 +1,114 @@
+package org.apache.hadoop.hbase.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Utility class that can be used to implement
+ * queues with limited capacity (in terms of memory).
+ * It maintains internal counter and provides
+ * two operations: increase and decrease.
+ * Increase blocks until internal counter is lower than
+ * given threshold and then increases internal counter.
+ * Decrease decreases internal counter and wakes up
+ * waiting threads if counter is lower than threshold.
+ *
+ * This implementation allows you to set the value of internal
+ * counter to be greater than threshold. It happens
+ * when internal counter is lower than threshold and
+ * increase method is called with parameter 'delta' big enough
+ * so that sum of delta and internal counter is greater than
+ * threshold. This is not a bug, this is a feature.
+ * It solves some problems:
+ *   - thread calling increase with big parameter will not be
+ *     starved by other threads calling increase with small
+ *     arguments.
+ *   - thread calling increase with argument greater than
+ *     threshold won't deadlock. This is useful when throttling
+ *     queues - you can submit object that is bigger than limit.
+ *
+ * This implementation introduces small costs in terms of
+ * synchronization (no synchronization in most cases at all), but is
+ * vulnerable to races. For details see documentation of
+ * increase method.
+ */
+public class SizeBasedThrottler {
+
+  private final long threshold;
+  private final AtomicLong currentSize;
+
+  /**
+   * Creates SizeBoundary with provided threshold
+   *
+   * @param threshold threshold used by instance
+   */
+  public SizeBasedThrottler(long threshold) {
+    if (threshold <= 0) {
+      throw new IllegalArgumentException("Treshold must be greater than 0");
+    }
+    this.threshold = threshold;
+    this.currentSize = new AtomicLong(0);
+  }
+
+  /**
+   * Blocks until internal counter is lower than threshold
+   * and then increases value of internal counter.
+   *
+   * THIS METHOD IS VULNERABLE TO RACES.
+   * It may happen that increment operation will
+   * succeed immediately, even if it should block. This happens when
+   * at least two threads call increase at the some moment. The decision
+   * whether to block is made at the beginning, without synchronization.
+   * If value of currentSize is lower than threshold at that time, call
+   * will succeed immediately. It is possible, that 2 threads will make
+   * decision not to block, even if one of them should block.
+   *
+   * @param delta increase internal counter by this value
+   * @return new value of internal counter
+   * @throws InterruptedException when interrupted during waiting
+   */
+  public synchronized long increase(long delta) throws InterruptedException{
+    if (currentSize.get() >= threshold) {
+      synchronized (this) {
+        while (currentSize.get() >= threshold) {
+          wait();
+        }
+      }
+    }
+
+    return currentSize.addAndGet(delta);
+  }
+
+
+  /**
+   * Decreases value of internal counter. Wakes up waiting threads if required.
+   *
+   * @param delta decrease internal counter by this value
+   * @return new value of internal counter
+   */
+  public synchronized long decrease(long delta) {
+    final long newSize = currentSize.addAndGet(-delta);
+
+    if (newSize < threshold && newSize + delta >= threshold) {
+      synchronized (this) {
+        notifyAll();
+      }
+    }
+
+    return newSize;
+  }
+
+  /**
+   *
+   * @return current value of internal counter
+   */
+  public synchronized long getCurrentValue(){
+    return currentSize.get();
+  }
+
+  /**
+   * @return threshold
+   */
+  public long getThreshold(){
+    return threshold;
+  }
+}

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestSizeBasedThrottler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestSizeBasedThrottler.java?rev=1385058&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestSizeBasedThrottler.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestSizeBasedThrottler.java Sat Sep 15 12:50:45 2012
@@ -0,0 +1,116 @@
+package org.apache.hadoop.hbase.util;
+
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * This test can fail from time to time and it is ok.
+ * It tests some race conditions that can happen
+ * occasionally, but not every time.
+ */
+public class TestSizeBasedThrottler {
+
+  private static final int REPEATS = 100;
+
+  private Thread makeThread(final SizeBasedThrottler throttler,
+      final AtomicBoolean failed, final int delta,
+      final int limit, final CountDownLatch latch) {
+
+    Thread ret = new Thread(new Runnable() {
+
+      @Override
+      public void run() {
+        try {
+          latch.await();
+          if (throttler.increase(delta) > limit) {
+            failed.set(true);
+          }
+          throttler.decrease(delta);
+        } catch (Exception e) {
+          failed.set(true);
+        }
+      }
+    });
+
+    ret.start();
+    return ret;
+
+  }
+
+  private void runGenericTest(int threshold, int delta, int maxValueAllowed,
+      int numberOfThreads, long timeout) {
+    SizeBasedThrottler throttler = new SizeBasedThrottler(threshold);
+    AtomicBoolean failed = new AtomicBoolean(false);
+
+    ArrayList<Thread> threads = new ArrayList<Thread>(numberOfThreads);
+    CountDownLatch latch = new CountDownLatch(1);
+    long timeElapsed = 0;
+
+    for (int i = 0; i < numberOfThreads; ++i) {
+      threads.add(makeThread(throttler, failed, delta, maxValueAllowed, latch));
+    }
+
+    latch.countDown();
+    for (Thread t : threads) {
+      try {
+        long beforeJoin = System.currentTimeMillis();
+        t.join(timeout - timeElapsed);
+        timeElapsed += System.currentTimeMillis() - beforeJoin;
+        if (t.isAlive() || timeElapsed >= timeout) {
+          fail("Timeout reached.");
+        }
+      } catch (InterruptedException e) {
+        fail("Got InterruptedException");
+      }
+    }
+
+    assertFalse(failed.get());
+  }
+
+  @Test
+  public void testSmallIncreases(){
+    for (int i = 0; i < REPEATS; ++i) {
+      runGenericTest(
+          10, // threshold
+          1,  // delta
+          15, // fail if throttler's value
+              // exceeds 15
+          1000, // use 1000 threads
+          200 // wait for 200ms
+          );
+    }
+  }
+
+  @Test
+  public void testBigIncreases() {
+    for (int i = 0; i < REPEATS; ++i) {
+      runGenericTest(
+          1, // threshold
+          2, // delta
+          4, // fail if throttler's value
+             // exceeds 4
+          1000, // use 1000 threads
+          200 // wait for 200ms
+          );
+    }
+  }
+
+  @Test
+  public void testIncreasesEqualToThreshold(){
+    for (int i = 0; i < REPEATS; ++i) {
+      runGenericTest(
+          1, // threshold
+          1, // delta
+          2, // fail if throttler's value
+             // exceeds 2
+          1000, // use 1000 threads
+          200 // wait for 200ms
+          );
+    }
+  }
+
+}