You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2012/07/27 18:48:43 UTC

svn commit: r1366445 - /flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java

Author: jarcec
Date: Fri Jul 27 16:48:42 2012
New Revision: 1366445

URL: http://svn.apache.org/viewvc?rev=1366445&view=rev
Log:
FLUME-1398. Improve concurrency for async hbase sink.

(Hari Shreedharan via Jarek Jarcec Cecho)

Modified:
    flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java

Modified: flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java?rev=1366445&r1=1366444&r2=1366445&view=diff
==============================================================================
--- flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java (original)
+++ flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java Fri Jul 27 16:48:42 2012
@@ -18,14 +18,12 @@
  */
 package org.apache.flume.sink.hbase;
 
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
-import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
@@ -45,6 +43,11 @@ import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.stumbleupon.async.Callback;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.flume.instrumentation.SinkCounter;
 
 /**
 *
@@ -86,8 +89,8 @@ public class AsyncHBaseSink extends Abst
   private String tableName;
   private byte[] columnFamily;
   private long batchSize;
-  private CounterGroup counterGroup = new CounterGroup();
-  private static final Logger logger = LoggerFactory.getLogger(HBaseSink.class);
+  private static final Logger logger =
+          LoggerFactory.getLogger(AsyncHBaseSink.class);
   private AsyncHbaseEventSerializer serializer;
   private String eventSerializerType;
   private Context serializerContext;
@@ -95,6 +98,7 @@ public class AsyncHBaseSink extends Abst
   private Configuration conf;
   private Transaction txn;
   private volatile boolean open = false;
+  private SinkCounter sinkCounter;
 
   public AsyncHBaseSink(){
     conf = HBaseConfiguration.create();
@@ -117,48 +121,83 @@ public class AsyncHBaseSink extends Abst
           "Please fix the configuration.");
     }
     AtomicBoolean txnFail = new AtomicBoolean(false);
+    AtomicInteger callbacksReceived = new AtomicInteger(0);
+    AtomicInteger callbacksExpected = new AtomicInteger(0);
+    final Lock lock = new ReentrantLock();
+    final Condition condition = lock.newCondition();
+    /*
+     * Callbacks can be reused per transaction, since they share the same
+     * locks and conditions.
+     */
+    Callback<Object, Object> putSuccessCallback =
+            new SuccessCallback<Object, Object>(
+            lock, callbacksReceived, condition);
+    Callback<Object, Object> putFailureCallback =
+            new FailureCallback<Object, Object>(
+            lock, callbacksReceived, txnFail, condition);
+
+    Callback<Long, Long> incrementSuccessCallback =
+            new SuccessCallback<Long, Long>(
+            lock, callbacksReceived, condition);
+    Callback<Long, Long> incrementFailureCallback =
+            new FailureCallback<Long, Long>(
+            lock, callbacksReceived, txnFail, condition);
+
     Status status = Status.READY;
     Channel channel = getChannel();
     txn = channel.getTransaction();
     txn.begin();
-    List<PutRequest> actions = new LinkedList<PutRequest>();
-    List<AtomicIncrementRequest> increments =
-        new LinkedList<AtomicIncrementRequest>();
-    for(int i = 0; i < batchSize; i++){
+    int i = 0;
+    for (; i < batchSize; i++) {
       Event event = channel.take();
-      if(event == null){
+      if (event == null) {
         status = Status.BACKOFF;
-        counterGroup.incrementAndGet("channel.underflow");
+        if (i == 0) {
+          sinkCounter.incrementBatchEmptyCount();
+        } else {
+          sinkCounter.incrementBatchUnderflowCount();
+        }
         break;
       } else {
         serializer.setEvent(event);
-        actions.addAll(serializer.getActions());
-        increments.addAll(serializer.getIncrements());
+        List<PutRequest> actions = serializer.getActions();
+        List<AtomicIncrementRequest> increments = serializer.getIncrements();
+        callbacksExpected.addAndGet(actions.size() + increments.size());
+
+        for (PutRequest action : actions) {
+          client.put(action).addCallbacks(putSuccessCallback, putFailureCallback);
+        }
+        for (AtomicIncrementRequest increment : increments) {
+          client.atomicIncrement(increment).addCallbacks(
+                  incrementSuccessCallback, incrementFailureCallback);
+        }
       }
     }
-    CountDownLatch latch =
-        new CountDownLatch(actions.size() + increments.size());
-    for(PutRequest action : actions) {
-      Callback<Object, Object> callback =
-          new SuccessCallback<Object, Object>(latch);
-      Callback<Object, Object> errback =
-          new ErrBack<Object, Object>(latch, txnFail);
-      client.put(action).addCallbacks(callback, errback);
-    }
-    for(AtomicIncrementRequest increment : increments) {
-      Callback<Long, Long> callback =
-          new SuccessCallback<Long, Long>(latch);
-      Callback<Long, Long> errback = new ErrBack<Long, Long>(latch, txnFail);
-      client.atomicIncrement(increment).addCallbacks(callback, errback);
+    if(i == batchSize) {
+      sinkCounter.incrementBatchCompleteCount();
     }
+    sinkCounter.addToEventDrainAttemptCount(i);
 
+    lock.lock();
     try {
-      latch.await();
-    } catch (InterruptedException e1) {
-      this.handleTransactionFailure(txn);
-      throw new EventDeliveryException("Sink interrupted while waiting" +
-          "for Hbase callbacks. Exception follows.", e1);
+      while ((callbacksReceived.get() < callbacksExpected.get())
+              && !txnFail.get()) {
+        try {
+          condition.await();
+        } catch (InterruptedException ex) {
+          logger.error("Interrupted while waiting for callbacks from HBase.");
+          try {
+            txn.rollback();
+          } finally {
+            txn.close();
+          }
+          Throwables.propagate(ex);
+        }
+      }
+    } finally {
+      lock.unlock();
     }
+
     /*
      * At this point, either the txn has failed
      * or all callbacks received and txn is successful.
@@ -176,6 +215,7 @@ public class AsyncHBaseSink extends Abst
     } else {
       try{
         txn.commit();
+        sinkCounter.addToEventDrainSuccessCount(i);
       } catch (Throwable e) {
         try{
           txn.rollback();
@@ -183,7 +223,6 @@ public class AsyncHBaseSink extends Abst
           logger.error("Exception in rollback. Rollback might not have been" +
               "successful." , e2);
         }
-        counterGroup.incrementAndGet("transaction.rollback");
         logger.error("Failed to commit transaction." +
             "Transaction rolled back.", e);
         if(e instanceof Error || e instanceof RuntimeException){
@@ -240,11 +279,17 @@ public class AsyncHBaseSink extends Abst
       logger.error("Could not instantiate event serializer." , e);
       Throwables.propagate(e);
     }
+
+    if(sinkCounter == null) {
+      sinkCounter = new SinkCounter(this.getName());
+    }
   }
   @Override
   public void start(){
-    Preconditions.checkArgument(client == null, "Please call stop " +
-        "before calling start on an old instance.");
+    Preconditions.checkArgument(client == null, "Please call stop "
+            + "before calling start on an old instance.");
+    sinkCounter.start();
+    sinkCounter.incrementConnectionCreatedCount();
     String zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
     String zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
     if(zkBaseDir != null){
@@ -252,19 +297,35 @@ public class AsyncHBaseSink extends Abst
     } else {
       client = new HBaseClient(zkQuorum);
     }
-    CountDownLatch latch = new CountDownLatch(1);
-    AtomicBoolean fail = new AtomicBoolean(false);
+    final CountDownLatch latch = new CountDownLatch(1);
+    final AtomicBoolean fail = new AtomicBoolean(false);
     client.ensureTableFamilyExists(
-        tableName.getBytes(Charsets.UTF_8), columnFamily).addCallbacks(
-            new SuccessCallback<Object, Object>(latch) ,
-            new ErrBack<Object, Object>(latch, fail));
+            tableName.getBytes(Charsets.UTF_8), columnFamily).addCallbacks(
+            new Callback<Object, Object>() {
+              @Override
+              public Object call(Object arg) throws Exception {
+                latch.countDown();
+                return null;
+              }
+            },
+            new Callback<Object, Object>() {
+              @Override
+              public Object call(Object arg) throws Exception {
+                fail.set(true);
+                latch.countDown();
+                return null;
+              }
+            });
+
     try {
       latch.await();
     } catch (InterruptedException e) {
+      sinkCounter.incrementConnectionFailedCount();
       throw new FlumeException(
           "Interrupted while waiting for Hbase Callbacks", e);
     }
     if(fail.get()){
+      sinkCounter.incrementConnectionFailedCount();
       throw new FlumeException(
           "Could not start sink. " +
           "Table or column family does not exist in Hbase.");
@@ -279,8 +340,11 @@ public class AsyncHBaseSink extends Abst
   public void stop(){
     serializer.cleanUp();
     client.shutdown();
+    sinkCounter.incrementConnectionClosedCount();
+    sinkCounter.stop();
     client = null;
     open = false;
+    super.stop();
   }
 
   private void handleTransactionFailure(Transaction txn)
@@ -288,7 +352,6 @@ public class AsyncHBaseSink extends Abst
     try {
       txn.rollback();
     } catch (Throwable e) {
-      counterGroup.incrementAndGet("transaction.rollback");
       logger.error("Failed to commit transaction." +
           "Transaction rolled back.", e);
       if(e instanceof Error || e instanceof RuntimeException){
@@ -305,53 +368,55 @@ public class AsyncHBaseSink extends Abst
       txn.close();
     }
   }
-
-  private class SuccessCallback<R, T> implements Callback<R, T> {
-
-    private CountDownLatch latch;
-    public SuccessCallback(CountDownLatch latch){
-      this.latch = latch;
+  private class SuccessCallback<R,T> implements Callback<R,T> {
+    private Lock lock;
+    private AtomicInteger callbacksReceived;
+    private Condition condition;
+
+    public SuccessCallback(Lock lck, AtomicInteger callbacksReceived,
+            Condition condition) {
+      lock = lck;
+      this.callbacksReceived = callbacksReceived;
+      this.condition = condition;
     }
-
     @Override
-    public R call(T arg0) throws Exception {
-      latch.countDown();
+    public R call(T arg) throws Exception {
+      callbacksReceived.incrementAndGet();
+      lock.lock();
+      try{
+        condition.signal();
+      } finally {
+        lock.unlock();
+      }
       return null;
     }
   }
 
-  private class ErrBack<R, T> implements Callback<R, T> {
-
-    private CountDownLatch latch;
-    /*
-     * Reference to the boolean representing failure of the current transaction.
-     * Since each txn gets a new boolean, failure of one txn will not affect
-     * the next even if errbacks for the current txn come while the next one is
-     * being processed.
-     *
-     */
+  private class FailureCallback<R,T> implements Callback<R,T> {
+    private Lock lock;
+    private AtomicInteger callbacksReceived;
     private AtomicBoolean txnFail;
-    public ErrBack(CountDownLatch latch, AtomicBoolean txnFail){
-      this.latch = latch;
+    private Condition condition;
+
+    public FailureCallback(Lock lck, AtomicInteger callbacksReceived,
+            AtomicBoolean txnFail, Condition condition){
+      this.lock = lck;
+      this.callbacksReceived = callbacksReceived;
       this.txnFail = txnFail;
+      this.condition = condition;
     }
 
     @Override
-    public R call(T arg0) throws Exception {
-      /*
-       * getCount() and countDown are thread safe. countDown will not let
-       * count to go < 0 anyway.
-       * So even if multiple threads call this method simultaneously,
-       * it is ok - eventually one will call countDown and set count to 0,
-       * then all countDown calls are simply no-ops anyway, and the
-       * process thread is released at count == 0.
-       */
-      txnFail.set(true);
-      while(latch.getCount() > 0 ) {
-        latch.countDown();
+    public R call(T arg) throws Exception {
+      callbacksReceived.incrementAndGet();
+      this.txnFail.set(true);
+      lock.lock();
+      try {
+        condition.signal();
+      } finally {
+        lock.unlock();
       }
       return null;
     }
-
   }
 }