You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2012/07/30 22:05:37 UTC

git commit: FLUME-1401: Asynchbase sink should be configurable to support timeout

Updated Branches:
  refs/heads/trunk 8cf91191a -> 5c3d966a7


FLUME-1401: Asynchbase sink should be configurable to support timeout

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5c3d966a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5c3d966a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5c3d966a

Branch: refs/heads/trunk
Commit: 5c3d966a7923b983271de882363cb1258460978e
Parents: 8cf9119
Author: Brock Noland <br...@apache.org>
Authored: Mon Jul 30 15:02:08 2012 -0500
Committer: Brock Noland <br...@apache.org>
Committed: Mon Jul 30 15:02:08 2012 -0500

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   20 ++--
 .../apache/flume/sink/hbase/AsyncHBaseSink.java    |  111 ++++++++-------
 .../hbase/HBaseSinkConfigurationConstants.java     |    3 +
 3 files changed, 75 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/5c3d966a/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 06bcbec..45dd7cc 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -30,7 +30,7 @@ different sources to a centralized data store.
 
 Apache Flume is a top level project at the Apache Software Foundation.
 There are currently two release code lines available, versions 0.9.x and 1.x.
-This documentation applies to the 1.x codeline.  
+This documentation applies to the 1.x codeline.
 Please click here for
 `the Flume 0.9.x User Guide <http://archive.cloudera.com/cdh/3/flume/UserGuide/>`_.
 
@@ -155,7 +155,7 @@ A simple example
 Here, we give an example configuration file, describing a single-node Flume deployment. This configuration lets a user generate events and subsequently logs them to the console.
 
 .. code-block:: properties
-   
+
   # example.conf: A single-node Flume configuration
 
   # Name the components on this agent
@@ -175,7 +175,7 @@ Here, we give an example configuration file, describing a single-node Flume depl
   agent1.channels.channel1.type = memory
   agent1.channels.channel1.capacity = 1000
   agent1.channels.channel1.transactionCapactiy = 100
- 
+
   # Bind the source and sink to the channel
   agent1.sources.source1.channels = channel1
   agent1.sinks.sink1.channel = channel1
@@ -643,7 +643,7 @@ interceptors.*
              of indicating to the application writing the log file that it needs to
              retain the log or that the event hasn't been sent, for some reason. If
              this doesn't make sense, you need only know this: Your application can
-             never guarantee data has been received when using a unidirectional 
+             never guarantee data has been received when using a unidirectional
              asynchronous interface such as ExecSource! As an extension of this
              warning - and to be completely clear - there is absolutely zero guarantee
              of event delivery when using this source. You have been warned.
@@ -1204,17 +1204,19 @@ This sink is still experimental.
 The type is the FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink.
 Required properties are in **bold**.
 
-================  ============================================================  =============================================================================
+================  ============================================================  ====================================================================================
 Property Name     Default                                                       Description
-================  ============================================================  =============================================================================
+================  ============================================================  ====================================================================================
 **channel**       --
 **type**          --                                                            The component type name, needs to be ``org.apache.flume.sink.AsyncHBaseSink``
 **table**         --                                                            The name of the table in Hbase to write to.
 **columnFamily**  --                                                            The column family in Hbase to write to.
 batchSize         100                                                           Number of events to be written per txn.
+timeout           --                                                            The length of time (in milliseconds) the sink waits for acks from hbase for
+                                                                                all events in a transaction. If no timeout is specified, the sink will wait forever.
 serializer        org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
 serializer.*      --                                                            Properties to be passed to the serializer.
-================  ============================================================  =============================================================================
+================  ============================================================  ====================================================================================
 
 Example for agent named **agent_foo**:
 
@@ -1361,8 +1363,8 @@ keep-alive            3                                 Amount of time (in sec)
 write-timeout         3                                 Amount of time (in sec) to wait for a write operation
 ====================  ================================  ========================================================
 
-.. note:: By default the File Channel uses paths for checkpoint and data 
-          directories that are within the user home as specified above. 
+.. note:: By default the File Channel uses paths for checkpoint and data
+          directories that are within the user home as specified above.
           As a result if you have more than one File Channel instances
           active within the agent, only one will be able to lock the
           directories and cause the other channel initialization to fail.

http://git-wip-us.apache.org/repos/asf/flume/blob/5c3d966a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index 6df1f33..1598f26 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -43,10 +43,12 @@ import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.stumbleupon.async.Callback;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import org.apache.flume.ChannelException;
 import org.apache.flume.instrumentation.SinkCounter;
 
 /**
@@ -74,6 +76,9 @@ import org.apache.flume.instrumentation.SinkCounter;
 * maximum number of events the sink will commit per transaction. The default
 * batch size is 100 events.
 * <p>
+* <tt>timeout: </tt> The length of time in milliseconds the sink waits for
+* callbacks from hbase for all events in a transaction.
+* If no timeout is specified, the sink will wait forever.<p>
 *
 * <strong>Note: </strong> Hbase does not guarantee atomic commits on multiple
 * rows. So if a subset of events in a batch are written to disk by Hbase and
@@ -99,6 +104,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
   private Transaction txn;
   private volatile boolean open = false;
   private SinkCounter sinkCounter;
+  private long timeout;
 
   public AsyncHBaseSink(){
     conf = HBaseConfiguration.create();
@@ -145,35 +151,40 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
 
     Status status = Status.READY;
     Channel channel = getChannel();
-    txn = channel.getTransaction();
-    txn.begin();
     int i = 0;
-    for (; i < batchSize; i++) {
-      Event event = channel.take();
-      if (event == null) {
-        status = Status.BACKOFF;
-        if (i == 0) {
-          sinkCounter.incrementBatchEmptyCount();
+    try {
+      txn = channel.getTransaction();
+      txn.begin();
+      for (; i < batchSize; i++) {
+        Event event = channel.take();
+        if (event == null) {
+          status = Status.BACKOFF;
+          if (i == 0) {
+            sinkCounter.incrementBatchEmptyCount();
+          } else {
+            sinkCounter.incrementBatchUnderflowCount();
+          }
+          break;
         } else {
-          sinkCounter.incrementBatchUnderflowCount();
-        }
-        break;
-      } else {
-        serializer.setEvent(event);
-        List<PutRequest> actions = serializer.getActions();
-        List<AtomicIncrementRequest> increments = serializer.getIncrements();
-        callbacksExpected.addAndGet(actions.size() + increments.size());
+          serializer.setEvent(event);
+          List<PutRequest> actions = serializer.getActions();
+          List<AtomicIncrementRequest> increments = serializer.getIncrements();
+          callbacksExpected.addAndGet(actions.size() + increments.size());
 
-        for (PutRequest action : actions) {
-          client.put(action).addCallbacks(putSuccessCallback, putFailureCallback);
-        }
-        for (AtomicIncrementRequest increment : increments) {
-          client.atomicIncrement(increment).addCallbacks(
-                  incrementSuccessCallback, incrementFailureCallback);
+          for (PutRequest action : actions) {
+            client.put(action).addCallbacks(putSuccessCallback, putFailureCallback);
+          }
+          for (AtomicIncrementRequest increment : increments) {
+            client.atomicIncrement(increment).addCallbacks(
+                    incrementSuccessCallback, incrementFailureCallback);
+          }
         }
       }
+    } catch (Throwable e) {
+      this.handleTransactionFailure(txn);
+      this.checkIfChannelExceptionAndThrow(e);
     }
-    if(i == batchSize) {
+    if (i == batchSize) {
       sinkCounter.incrementBatchCompleteCount();
     }
     sinkCounter.addToEventDrainAttemptCount(i);
@@ -183,14 +194,14 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
       while ((callbacksReceived.get() < callbacksExpected.get())
               && !txnFail.get()) {
         try {
-          condition.await();
-        } catch (InterruptedException ex) {
-          logger.error("Interrupted while waiting for callbacks from HBase.");
-          try {
-            txn.rollback();
-          } finally {
-            txn.close();
+          if(!condition.await(timeout, TimeUnit.MILLISECONDS)){
+            txnFail.set(true);
+            logger.warn("HBase callbacks timed out. "
+                    + "Transaction will be rolled back.");
           }
+        } catch (Exception ex) {
+          logger.error("Exception while waiting for callbacks from HBase.");
+          this.handleTransactionFailure(txn);
           Throwables.propagate(ex);
         }
       }
@@ -215,28 +226,11 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
     } else {
       try{
         txn.commit();
+        txn.close();
         sinkCounter.addToEventDrainSuccessCount(i);
       } catch (Throwable e) {
-        try{
-          txn.rollback();
-        } catch (Exception e2) {
-          logger.error("Exception in rollback. Rollback might not have been" +
-              "successful." , e2);
-        }
-        logger.error("Failed to commit transaction." +
-            "Transaction rolled back.", e);
-        if(e instanceof Error || e instanceof RuntimeException){
-          logger.error("Failed to commit transaction." +
-              "Transaction rolled back.", e);
-          Throwables.propagate(e);
-        } else {
-          logger.error("Failed to commit transaction." +
-              "Transaction rolled back.", e);
-          throw new EventDeliveryException("Failed to commit transaction." +
-              "Transaction rolled back.", e);
-        }
-      } finally {
-        txn.close();
+        this.handleTransactionFailure(txn);
+        this.checkIfChannelExceptionAndThrow(e);
       }
     }
 
@@ -283,6 +277,13 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
     if(sinkCounter == null) {
       sinkCounter = new SinkCounter(this.getName());
     }
+    timeout = context.getLong(HBaseSinkConfigurationConstants.CONFIG_TIMEOUT,
+            HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT);
+    if(timeout <= 0){
+      logger.warn("Timeout should be positive for Hbase sink. "
+              + "Sink will not timeout.");
+      timeout = HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT;
+    }
   }
   @Override
   public void start(){
@@ -419,4 +420,14 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
       return null;
     }
   }
+
+  private void checkIfChannelExceptionAndThrow(Throwable e)
+          throws EventDeliveryException {
+    if (e instanceof ChannelException) {
+      throw new EventDeliveryException("Error in processing transaction.", e);
+    } else if (e instanceof Error || e instanceof RuntimeException) {
+      Throwables.propagate(e);
+    }
+    throw new EventDeliveryException("Error in processing transaction.", e);
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/5c3d966a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
index a16cda8..62f7097 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
@@ -44,5 +44,8 @@ public class HBaseSinkConfigurationConstants {
    */
   public static final String CONFIG_SERIALIZER_PREFIX = CONFIG_SERIALIZER + ".";
 
+  public static final String CONFIG_TIMEOUT = "timeout";
+
+  public static final long DEFAULT_TIMEOUT = Long.MAX_VALUE;
 
 }