You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/01/17 01:14:50 UTC

git commit: FLUME-1842: AsyncHBaseSink timeout is not calculated correctly

Updated Branches:
  refs/heads/trunk ab0894c7f -> 11fada202


FLUME-1842: AsyncHBaseSink timeout is not calculated correctly

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/11fada20
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/11fada20
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/11fada20

Branch: refs/heads/trunk
Commit: 11fada2026e34a704d9f710bdd24766c306d040c
Parents: ab0894c
Author: Brock Noland <br...@apache.org>
Authored: Wed Jan 16 16:14:25 2013 -0800
Committer: Brock Noland <br...@apache.org>
Committed: Wed Jan 16 16:14:25 2013 -0800

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |    4 +-
 .../apache/flume/sink/hbase/AsyncHBaseSink.java    |   75 +++++++++++++--
 .../hbase/HBaseSinkConfigurationConstants.java     |    2 +-
 .../flume/sink/hbase/TestAsyncHBaseSink.java       |   63 ++++++++++--
 4 files changed, 122 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/11fada20/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 58a115e..aa92974 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1539,8 +1539,8 @@ zookeeperQuorum   --
 znodeParent       /hbase                                                        The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml
 **columnFamily**  --                                                            The column family in Hbase to write to.
 batchSize         100                                                           Number of events to be written per txn.
-timeout           --                                                            The length of time (in milliseconds) the sink waits for acks from hbase for
-                                                                                all events in a transaction. If no timeout is specified, the sink will wait forever.
+timeout           60000                                                         The length of time (in milliseconds) the sink waits for acks from hbase for
+                                                                                all events in a transaction.
 serializer        org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
 serializer.*      --                                                            Properties to be passed to the serializer.
 ================  ============================================================  ====================================================================================

http://git-wip-us.apache.org/repos/asf/flume/blob/11fada20/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index 6b34873..0b6f885 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -20,9 +20,12 @@ package org.apache.flume.sink.hbase;
 
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -108,13 +111,20 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
   private long timeout;
   private String zkQuorum;
   private String zkBaseDir;
+  private ExecutorService sinkCallbackPool;
+  private boolean isTest;
 
   public AsyncHBaseSink(){
     this(null);
   }
 
   public AsyncHBaseSink(Configuration conf) {
+    this(conf, false);
+  }
+
+  AsyncHBaseSink(Configuration conf, boolean isTimeoutTesting) {
     this.conf = conf;
+    isTest = isTimeoutTesting;
   }
 
   @Override
@@ -184,6 +194,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
           }
         }
       }
+      client.flush();
     } catch (Throwable e) {
       this.handleTransactionFailure(txn);
       this.checkIfChannelExceptionAndThrow(e);
@@ -194,11 +205,15 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
     sinkCounter.addToEventDrainAttemptCount(i);
 
     lock.lock();
+    long startTime = System.nanoTime();
+    long timeRemaining;
     try {
       while ((callbacksReceived.get() < callbacksExpected.get())
               && !txnFail.get()) {
+        timeRemaining = timeout - (System.nanoTime() - startTime);
+        timeRemaining = (timeRemaining >= 0) ? timeRemaining : 0;
         try {
-          if(!condition.await(timeout, TimeUnit.MILLISECONDS)){
+          if(!condition.await(timeRemaining, TimeUnit.NANOSECONDS)){
             txnFail.set(true);
             logger.warn("HBase callbacks timed out. "
                     + "Transaction will be rolled back.");
@@ -288,6 +303,8 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
               + "Sink will not timeout.");
       timeout = HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT;
     }
+    //Convert to nanos.
+    timeout = TimeUnit.MILLISECONDS.toNanos(timeout);
 
     zkQuorum = context.getString(
         HBaseSinkConfigurationConstants.ZK_QUORUM, "").trim();
@@ -300,7 +317,8 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
         conf = HBaseConfiguration.create();
       }
       zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
-      zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+      zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
+        HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
     }
     Preconditions.checkState(zkQuorum != null && !zkQuorum.isEmpty(),
         "The Zookeeper quorum cannot be null and should be specified.");
@@ -316,11 +334,13 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
             + "before calling start on an old instance.");
     sinkCounter.start();
     sinkCounter.incrementConnectionCreatedCount();
-    if(zkBaseDir != null){
-      client = new HBaseClient(zkQuorum, zkBaseDir);
+    if (!isTest) {
+      sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+        .setNameFormat(this.getName() + " HBase Call Pool").build());
     } else {
-      client = new HBaseClient(zkQuorum);
+      sinkCallbackPool = Executors.newSingleThreadExecutor();
     }
+    client = new HBaseClient(zkQuorum, zkBaseDir, sinkCallbackPool);
     final CountDownLatch latch = new CountDownLatch(1);
     final AtomicBoolean fail = new AtomicBoolean(false);
     client.ensureTableFamilyExists(
@@ -366,6 +386,17 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
     client.shutdown();
     sinkCounter.incrementConnectionClosedCount();
     sinkCounter.stop();
+    sinkCallbackPool.shutdown();
+    try {
+      if(!sinkCallbackPool.awaitTermination(5, TimeUnit.SECONDS)) {
+        sinkCallbackPool.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      logger.error("Interrupted while waiting for asynchbase sink pool to " +
+        "die", e);
+      sinkCallbackPool.shutdownNow();
+    }
+    sinkCallbackPool = null;
     client = null;
     conf = null;
     open = false;
@@ -397,15 +428,31 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
     private Lock lock;
     private AtomicInteger callbacksReceived;
     private Condition condition;
+    private final boolean isTimeoutTesting;
 
     public SuccessCallback(Lock lck, AtomicInteger callbacksReceived,
             Condition condition) {
       lock = lck;
       this.callbacksReceived = callbacksReceived;
       this.condition = condition;
+      isTimeoutTesting = isTest;
     }
+
     @Override
     public R call(T arg) throws Exception {
+      if (isTimeoutTesting) {
+        try {
+          //tests set timeout to 10 seconds, so sleep for 4 seconds
+          TimeUnit.NANOSECONDS.sleep(TimeUnit.SECONDS.toNanos(4));
+        } catch (InterruptedException e) {
+          //ignore
+        }
+      }
+      doCall();
+      return null;
+    }
+
+    private void doCall() throws Exception {
       callbacksReceived.incrementAndGet();
       lock.lock();
       try{
@@ -413,7 +460,6 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
       } finally {
         lock.unlock();
       }
-      return null;
     }
   }
 
@@ -422,17 +468,31 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
     private AtomicInteger callbacksReceived;
     private AtomicBoolean txnFail;
     private Condition condition;
-
+    private final boolean isTimeoutTesting;
     public FailureCallback(Lock lck, AtomicInteger callbacksReceived,
             AtomicBoolean txnFail, Condition condition){
       this.lock = lck;
       this.callbacksReceived = callbacksReceived;
       this.txnFail = txnFail;
       this.condition = condition;
+      isTimeoutTesting = isTest;
     }
 
     @Override
     public R call(T arg) throws Exception {
+      if (isTimeoutTesting) {
+        //tests set timeout to 10 seconds, so sleep for 4 seconds
+        try {
+          TimeUnit.NANOSECONDS.sleep(TimeUnit.SECONDS.toNanos(4));
+        } catch (InterruptedException e) {
+          //ignore
+        }
+      }
+      doCall();
+      return null;
+    }
+
+    private void doCall() throws Exception {
       callbacksReceived.incrementAndGet();
       this.txnFail.set(true);
       lock.lock();
@@ -441,7 +501,6 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
       } finally {
         lock.unlock();
       }
-      return null;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/11fada20/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
index fad026c..fb6bd4e 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
@@ -49,7 +49,7 @@ public class HBaseSinkConfigurationConstants {
 
   public static final String CONFIG_TIMEOUT = "timeout";
 
-  public static final long DEFAULT_TIMEOUT = Long.MAX_VALUE;
+  public static final long DEFAULT_TIMEOUT = 60000;
 
   public static final String CONFIG_KEYTAB = "kerberosKeytab";
 

http://git-wip-us.apache.org/repos/asf/flume/blob/11fada20/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
index 1f61406..03c3e4c 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
@@ -59,6 +59,8 @@ import com.google.common.io.Files;
 import com.google.common.primitives.Longs;
 import java.lang.reflect.Method;
 
+import org.junit.After;
+
 public class TestAsyncHBaseSink {
   private static HBaseTestingUtility testUtility;
   private static MiniZooKeeperCluster zookeeperCluster;
@@ -71,6 +73,7 @@ public class TestAsyncHBaseSink {
   private static String plCol = "pc";
   private static Context ctx = new Context();
   private static String valBase = "testing hbase sink: jham";
+  private boolean deleteTable = true;
 
 
   @BeforeClass
@@ -141,6 +144,8 @@ public class TestAsyncHBaseSink {
         "org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer");
     ctxMap.put("serializer.payloadColumn", plCol);
     ctxMap.put("serializer.incrementColumn", inColumn);
+    ctxMap.put("keep-alive", "0");
+    ctxMap.put("timeout", "10000");
     ctx.putAll(ctxMap);
   }
 
@@ -151,13 +156,21 @@ public class TestAsyncHBaseSink {
     FileUtils.deleteDirectory(new File(workDir));
   }
 
+  @After
+  public void tearDownTest() throws Exception {
+    if (deleteTable) {
+      testUtility.deleteTable(tableName.getBytes());
+    }
+  }
+
   @Test
   public void testOneEvent() throws Exception {
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    deleteTable = true;
     AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration());
     Configurables.configure(sink, ctx);
     Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
+    Configurables.configure(channel, ctx);
     sink.setChannel(channel);
     sink.start();
     Transaction tx = channel.getTransaction();
@@ -176,16 +189,16 @@ public class TestAsyncHBaseSink {
     Assert.assertArrayEquals(e.getBody(), out);
     out = results[1];
     Assert.assertArrayEquals(Longs.toByteArray(1), out);
-    testUtility.deleteTable(tableName.getBytes());
   }
 
   @Test
   public void testThreeEvents() throws Exception {
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    deleteTable = true;
     AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration());
     Configurables.configure(sink, ctx);
     Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
+    Configurables.configure(channel, ctx);
     sink.setChannel(channel);
     sink.start();
     Transaction tx = channel.getTransaction();
@@ -214,19 +227,46 @@ public class TestAsyncHBaseSink {
     Assert.assertEquals(3, found);
     out = results[3];
     Assert.assertArrayEquals(Longs.toByteArray(3), out);
-    testUtility.deleteTable(tableName.getBytes());
+  }
+
+  //This will without FLUME-1842's timeout fix - but with FLUME-1842's testing
+  //oriented changes to the callback classes and using single threaded executor
+  //for tests.
+  @Test (expected = EventDeliveryException.class)
+  public void testTimeOut() throws Exception {
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    deleteTable = true;
+    AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(),
+      true);
+    Configurables.configure(sink, ctx);
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, ctx);
+    sink.setChannel(channel);
+    sink.start();
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for(int i = 0; i < 3; i++){
+      Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
+      channel.put(e);
+    }
+    tx.commit();
+    tx.close();
+    Assert.assertFalse(sink.isConfNull());
+    sink.process();
+    Assert.fail();
   }
 
   @Test
   public void testMultipleBatches() throws Exception {
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    deleteTable = true;
     ctx.put("batchSize", "2");
     AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration());
     Configurables.configure(sink, ctx);
     //Reset the context to a higher batchSize
     ctx.put("batchSize", "100");
     Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
+    Configurables.configure(channel, ctx);
     sink.setChannel(channel);
     sink.start();
     Transaction tx = channel.getTransaction();
@@ -261,17 +301,17 @@ public class TestAsyncHBaseSink {
     Assert.assertEquals(3, found);
     out = results[3];
     Assert.assertArrayEquals(Longs.toByteArray(3), out);
-    testUtility.deleteTable(tableName.getBytes());
   }
 
   @Test
   public void testWithoutConfigurationObject() throws Exception{
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    deleteTable = true;
     ctx.put("batchSize", "2");
     ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM,
         testUtility.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
     ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
-        testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+      testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
     AsyncHBaseSink sink = new AsyncHBaseSink();
     Configurables.configure(sink, ctx);
     // Reset context to values usable by other tests.
@@ -279,7 +319,7 @@ public class TestAsyncHBaseSink {
     ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,null);
     ctx.put("batchSize", "100");
     Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
+    Configurables.configure(channel, ctx);
     sink.setChannel(channel);
     sink.start();
     Transaction tx = channel.getTransaction();
@@ -318,18 +358,18 @@ public class TestAsyncHBaseSink {
     Assert.assertEquals(3, found);
     out = results[3];
     Assert.assertArrayEquals(Longs.toByteArray(3), out);
-    testUtility.deleteTable(tableName.getBytes());
   }
 
   @Test(expected = FlumeException.class)
   public void testMissingTable() throws Exception {
+    deleteTable = false;
     ctx.put("batchSize", "2");
     AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration());
     Configurables.configure(sink, ctx);
     //Reset the context to a higher batchSize
     ctx.put("batchSize", "100");
     Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
+    Configurables.configure(channel, ctx);
     sink.setChannel(channel);
     sink.start();
     Transaction tx = channel.getTransaction();
@@ -373,12 +413,13 @@ public class TestAsyncHBaseSink {
   public void testHBaseFailure() throws Exception {
     ctx.put("batchSize", "2");
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    deleteTable = false;
     AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration());
     Configurables.configure(sink, ctx);
     //Reset the context to a higher batchSize
     ctx.put("batchSize", "100");
     Channel channel = new MemoryChannel();
-    Configurables.configure(channel, new Context());
+    Configurables.configure(channel, ctx);
     sink.setChannel(channel);
     sink.start();
     Transaction tx = channel.getTransaction();