You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2015/01/21 22:50:06 UTC

flume git commit: FLUME-2594: Close Async HBase Client if there are large number of consecutive timeouts

Repository: flume
Updated Branches:
  refs/heads/trunk 91c58804d -> 82631f811


FLUME-2594: Close Async HBase Client if there are large number of consecutive timeouts

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/82631f81
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/82631f81
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/82631f81

Branch: refs/heads/trunk
Commit: 82631f811a7c6a4d8e6ec886f80d1a21876947e5
Parents: 91c5880
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Jan 21 13:49:41 2015 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Jan 21 13:49:41 2015 -0800

----------------------------------------------------------------------
 .../apache/flume/sink/hbase/AsyncHBaseSink.java | 74 ++++++++++++++++++--
 1 file changed, 69 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/82631f81/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index 1666be4..1d05189 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -127,12 +127,21 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
   private boolean batchIncrements = false;
   private volatile int totalCallbacksReceived = 0;
   private Map<CellIdentifier, AtomicIncrementRequest> incrementBuffer;
+  // The HBaseClient buffers the requests until a callback is received. In the event of a
+  // timeout, there is no way to clear these buffers. If there is a major cluster issue, this
+  // buffer can become too big and cause crashes. So if we hit a fixed number of HBase write
+  // failures/timeouts, then close the HBase Client (gracefully or not) and force a GC to get rid
+  // of the buffered data.
+  private int consecutiveHBaseFailures = 0;
+  private boolean lastTxnFailed = false;
 
   // Does not need to be thread-safe. Always called only from the sink's
   // process method.
   private final Comparator<byte[]> COMPARATOR = UnsignedBytes
     .lexicographicalComparator();
 
+  private static final int MAX_CONSECUTIVE_FAILS = 10;
+
   public AsyncHBaseSink(){
     this(null);
   }
@@ -162,6 +171,12 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
       throw new EventDeliveryException("Sink was never opened. " +
           "Please fix the configuration.");
     }
+    if (client == null) {
+      client = initHBaseClient();
+      if (client == null) {
+        throw new EventDeliveryException("Could not establish connection to HBase!");
+      }
+    }
     AtomicBoolean txnFail = new AtomicBoolean(false);
     AtomicInteger callbacksReceived = new AtomicInteger(0);
     AtomicInteger callbacksExpected = new AtomicInteger(0);
@@ -292,11 +307,19 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
      *
      */
     if (txnFail.get()) {
+      // We enter this if condition only if the failure was due to HBase failure, so we make sure
+      // we track the consecutive failures.
+      if (lastTxnFailed) {
+        consecutiveHBaseFailures++;
+      }
+      lastTxnFailed = true;
       this.handleTransactionFailure(txn);
       throw new EventDeliveryException("Could not write events to Hbase. " +
           "Transaction failed, and rolled back.");
     } else {
       try {
+        lastTxnFailed = false;
+        consecutiveHBaseFailures = 0;
         txn.commit();
         txn.close();
         sinkCounter.addToEventDrainSuccessCount(i);
@@ -414,7 +437,12 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
       sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
         .setNameFormat(this.getName() + " HBase Call Pool").build());
     logger.info("Callback pool created");
-    if(!isTimeoutTest) {
+    client = initHBaseClient();
+    super.start();
+  }
+
+  private HBaseClient initHBaseClient() {
+    if (!isTimeoutTest) {
       client = new HBaseClient(zkQuorum, zkBaseDir, sinkCallbackPool);
     } else {
       client = new HBaseClient(zkQuorum, zkBaseDir,
@@ -454,8 +482,9 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
     }
     if(fail.get()){
       sinkCounter.incrementConnectionFailedCount();
-      client.shutdown();
-      client = null;
+      if (client != null) {
+        shutdownHBaseClient();
+      }
       throw new FlumeException(
           "Could not start sink. " +
           "Table or column family does not exist in Hbase.");
@@ -463,14 +492,14 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
       open = true;
     }
     client.setFlushInterval((short) 0);
-    super.start();
+    return client;
   }
 
   @Override
   public void stop(){
     serializer.cleanUp();
     if (client != null) {
-      client.shutdown();
+      shutdownHBaseClient();
     }
     sinkCounter.incrementConnectionClosedCount();
     sinkCounter.stop();
@@ -496,8 +525,43 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
     super.stop();
   }
 
+  private void shutdownHBaseClient() {
+    final CountDownLatch waiter = new CountDownLatch(1);
+    try {
+      client.shutdown().addCallback(new Callback<Object, Object>() {
+        @Override
+        public Object call(Object arg) throws Exception {
+          waiter.countDown();
+          return null;
+        }
+      }).addErrback(new Callback<Object, Object>() {
+        @Override
+        public Object call(Object arg) throws Exception {
+          logger.error("Failed to shutdown HBase client cleanly! HBase cluster might be down");
+          waiter.countDown();
+          return null;
+        }
+      });
+      if (!waiter.await(timeout, TimeUnit.NANOSECONDS)) {
+        logger.error("HBase connection could not be closed within timeout! HBase cluster might " +
+          "be down!");
+      }
+    } catch (Exception ex) {
+      logger.warn("Error while attempting to close connections to HBase");
+    } finally {
+      // Dereference the client to force GC to clear up any buffered requests.
+      client = null;
+    }
+  }
+
   private void handleTransactionFailure(Transaction txn)
       throws EventDeliveryException {
+    if (consecutiveHBaseFailures >= MAX_CONSECUTIVE_FAILS) {
+      if (client != null) {
+        shutdownHBaseClient();
+      }
+      consecutiveHBaseFailures = 0;
+    }
     try {
       txn.rollback();
     } catch (Throwable e) {