You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/01/17 01:14:50 UTC
git commit: FLUME-1842: AsyncHBaseSink timeout is not calculated
correctly
Updated Branches:
refs/heads/trunk ab0894c7f -> 11fada202
FLUME-1842: AsyncHBaseSink timeout is not calculated correctly
(Hari Shreedharan via Brock Noland)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/11fada20
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/11fada20
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/11fada20
Branch: refs/heads/trunk
Commit: 11fada2026e34a704d9f710bdd24766c306d040c
Parents: ab0894c
Author: Brock Noland <br...@apache.org>
Authored: Wed Jan 16 16:14:25 2013 -0800
Committer: Brock Noland <br...@apache.org>
Committed: Wed Jan 16 16:14:25 2013 -0800
----------------------------------------------------------------------
flume-ng-doc/sphinx/FlumeUserGuide.rst | 4 +-
.../apache/flume/sink/hbase/AsyncHBaseSink.java | 75 +++++++++++++--
.../hbase/HBaseSinkConfigurationConstants.java | 2 +-
.../flume/sink/hbase/TestAsyncHBaseSink.java | 63 ++++++++++--
4 files changed, 122 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/11fada20/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 58a115e..aa92974 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1539,8 +1539,8 @@ zookeeperQuorum --
znodeParent /hbase The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml
**columnFamily** -- The column family in Hbase to write to.
batchSize 100 Number of events to be written per txn.
-timeout -- The length of time (in milliseconds) the sink waits for acks from hbase for
- all events in a transaction. If no timeout is specified, the sink will wait forever.
+timeout 60000 The length of time (in milliseconds) the sink waits for acks from hbase for
+ all events in a transaction.
serializer org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
serializer.* -- Properties to be passed to the serializer.
================ ============================================================ ====================================================================================
http://git-wip-us.apache.org/repos/asf/flume/blob/11fada20/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index 6b34873..0b6f885 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -20,9 +20,12 @@ package org.apache.flume.sink.hbase;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
@@ -108,13 +111,20 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
private long timeout;
private String zkQuorum;
private String zkBaseDir;
+ private ExecutorService sinkCallbackPool;
+ private boolean isTest;
public AsyncHBaseSink(){
this(null);
}
public AsyncHBaseSink(Configuration conf) {
+ this(conf, false);
+ }
+
+ AsyncHBaseSink(Configuration conf, boolean isTimeoutTesting) {
this.conf = conf;
+ isTest = isTimeoutTesting;
}
@Override
@@ -184,6 +194,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
}
}
}
+ client.flush();
} catch (Throwable e) {
this.handleTransactionFailure(txn);
this.checkIfChannelExceptionAndThrow(e);
@@ -194,11 +205,15 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
sinkCounter.addToEventDrainAttemptCount(i);
lock.lock();
+ long startTime = System.nanoTime();
+ long timeRemaining;
try {
while ((callbacksReceived.get() < callbacksExpected.get())
&& !txnFail.get()) {
+ timeRemaining = timeout - (System.nanoTime() - startTime);
+ timeRemaining = (timeRemaining >= 0) ? timeRemaining : 0;
try {
- if(!condition.await(timeout, TimeUnit.MILLISECONDS)){
+ if(!condition.await(timeRemaining, TimeUnit.NANOSECONDS)){
txnFail.set(true);
logger.warn("HBase callbacks timed out. "
+ "Transaction will be rolled back.");
@@ -288,6 +303,8 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
+ "Sink will not timeout.");
timeout = HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT;
}
+ //Convert to nanos.
+ timeout = TimeUnit.MILLISECONDS.toNanos(timeout);
zkQuorum = context.getString(
HBaseSinkConfigurationConstants.ZK_QUORUM, "").trim();
@@ -300,7 +317,8 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
conf = HBaseConfiguration.create();
}
zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
- zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+ zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
+ HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
}
Preconditions.checkState(zkQuorum != null && !zkQuorum.isEmpty(),
"The Zookeeper quorum cannot be null and should be specified.");
@@ -316,11 +334,13 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
+ "before calling start on an old instance.");
sinkCounter.start();
sinkCounter.incrementConnectionCreatedCount();
- if(zkBaseDir != null){
- client = new HBaseClient(zkQuorum, zkBaseDir);
+ if (!isTest) {
+ sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+ .setNameFormat(this.getName() + " HBase Call Pool").build());
} else {
- client = new HBaseClient(zkQuorum);
+ sinkCallbackPool = Executors.newSingleThreadExecutor();
}
+ client = new HBaseClient(zkQuorum, zkBaseDir, sinkCallbackPool);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean fail = new AtomicBoolean(false);
client.ensureTableFamilyExists(
@@ -366,6 +386,17 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
client.shutdown();
sinkCounter.incrementConnectionClosedCount();
sinkCounter.stop();
+ sinkCallbackPool.shutdown();
+ try {
+ if(!sinkCallbackPool.awaitTermination(5, TimeUnit.SECONDS)) {
+ sinkCallbackPool.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ logger.error("Interrupted while waiting for asynchbase sink pool to " +
+ "die", e);
+ sinkCallbackPool.shutdownNow();
+ }
+ sinkCallbackPool = null;
client = null;
conf = null;
open = false;
@@ -397,15 +428,31 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
private Lock lock;
private AtomicInteger callbacksReceived;
private Condition condition;
+ private final boolean isTimeoutTesting;
public SuccessCallback(Lock lck, AtomicInteger callbacksReceived,
Condition condition) {
lock = lck;
this.callbacksReceived = callbacksReceived;
this.condition = condition;
+ isTimeoutTesting = isTest;
}
+
@Override
public R call(T arg) throws Exception {
+ if (isTimeoutTesting) {
+ try {
+ //tests set timeout to 10 seconds, so sleep for 4 seconds
+ TimeUnit.NANOSECONDS.sleep(TimeUnit.SECONDS.toNanos(4));
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ }
+ doCall();
+ return null;
+ }
+
+ private void doCall() throws Exception {
callbacksReceived.incrementAndGet();
lock.lock();
try{
@@ -413,7 +460,6 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
} finally {
lock.unlock();
}
- return null;
}
}
@@ -422,17 +468,31 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
private AtomicInteger callbacksReceived;
private AtomicBoolean txnFail;
private Condition condition;
-
+ private final boolean isTimeoutTesting;
public FailureCallback(Lock lck, AtomicInteger callbacksReceived,
AtomicBoolean txnFail, Condition condition){
this.lock = lck;
this.callbacksReceived = callbacksReceived;
this.txnFail = txnFail;
this.condition = condition;
+ isTimeoutTesting = isTest;
}
@Override
public R call(T arg) throws Exception {
+ if (isTimeoutTesting) {
+ //tests set timeout to 10 seconds, so sleep for 4 seconds
+ try {
+ TimeUnit.NANOSECONDS.sleep(TimeUnit.SECONDS.toNanos(4));
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ }
+ doCall();
+ return null;
+ }
+
+ private void doCall() throws Exception {
callbacksReceived.incrementAndGet();
this.txnFail.set(true);
lock.lock();
@@ -441,7 +501,6 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
} finally {
lock.unlock();
}
- return null;
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/11fada20/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
index fad026c..fb6bd4e 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
@@ -49,7 +49,7 @@ public class HBaseSinkConfigurationConstants {
public static final String CONFIG_TIMEOUT = "timeout";
- public static final long DEFAULT_TIMEOUT = Long.MAX_VALUE;
+ public static final long DEFAULT_TIMEOUT = 60000;
public static final String CONFIG_KEYTAB = "kerberosKeytab";
http://git-wip-us.apache.org/repos/asf/flume/blob/11fada20/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
index 1f61406..03c3e4c 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
@@ -59,6 +59,8 @@ import com.google.common.io.Files;
import com.google.common.primitives.Longs;
import java.lang.reflect.Method;
+import org.junit.After;
+
public class TestAsyncHBaseSink {
private static HBaseTestingUtility testUtility;
private static MiniZooKeeperCluster zookeeperCluster;
@@ -71,6 +73,7 @@ public class TestAsyncHBaseSink {
private static String plCol = "pc";
private static Context ctx = new Context();
private static String valBase = "testing hbase sink: jham";
+ private boolean deleteTable = true;
@BeforeClass
@@ -141,6 +144,8 @@ public class TestAsyncHBaseSink {
"org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer");
ctxMap.put("serializer.payloadColumn", plCol);
ctxMap.put("serializer.incrementColumn", inColumn);
+ ctxMap.put("keep-alive", "0");
+ ctxMap.put("timeout", "10000");
ctx.putAll(ctxMap);
}
@@ -151,13 +156,21 @@ public class TestAsyncHBaseSink {
FileUtils.deleteDirectory(new File(workDir));
}
+ @After
+ public void tearDownTest() throws Exception {
+ if (deleteTable) {
+ testUtility.deleteTable(tableName.getBytes());
+ }
+ }
+
@Test
public void testOneEvent() throws Exception {
testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+ deleteTable = true;
AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration());
Configurables.configure(sink, ctx);
Channel channel = new MemoryChannel();
- Configurables.configure(channel, new Context());
+ Configurables.configure(channel, ctx);
sink.setChannel(channel);
sink.start();
Transaction tx = channel.getTransaction();
@@ -176,16 +189,16 @@ public class TestAsyncHBaseSink {
Assert.assertArrayEquals(e.getBody(), out);
out = results[1];
Assert.assertArrayEquals(Longs.toByteArray(1), out);
- testUtility.deleteTable(tableName.getBytes());
}
@Test
public void testThreeEvents() throws Exception {
testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+ deleteTable = true;
AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration());
Configurables.configure(sink, ctx);
Channel channel = new MemoryChannel();
- Configurables.configure(channel, new Context());
+ Configurables.configure(channel, ctx);
sink.setChannel(channel);
sink.start();
Transaction tx = channel.getTransaction();
@@ -214,19 +227,46 @@ public class TestAsyncHBaseSink {
Assert.assertEquals(3, found);
out = results[3];
Assert.assertArrayEquals(Longs.toByteArray(3), out);
- testUtility.deleteTable(tableName.getBytes());
+ }
+
+ //This will without FLUME-1842's timeout fix - but with FLUME-1842's testing
+ //oriented changes to the callback classes and using single threaded executor
+ //for tests.
+ @Test (expected = EventDeliveryException.class)
+ public void testTimeOut() throws Exception {
+ testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+ deleteTable = true;
+ AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(),
+ true);
+ Configurables.configure(sink, ctx);
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, ctx);
+ sink.setChannel(channel);
+ sink.start();
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ for(int i = 0; i < 3; i++){
+ Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
+ channel.put(e);
+ }
+ tx.commit();
+ tx.close();
+ Assert.assertFalse(sink.isConfNull());
+ sink.process();
+ Assert.fail();
}
@Test
public void testMultipleBatches() throws Exception {
testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+ deleteTable = true;
ctx.put("batchSize", "2");
AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration());
Configurables.configure(sink, ctx);
//Reset the context to a higher batchSize
ctx.put("batchSize", "100");
Channel channel = new MemoryChannel();
- Configurables.configure(channel, new Context());
+ Configurables.configure(channel, ctx);
sink.setChannel(channel);
sink.start();
Transaction tx = channel.getTransaction();
@@ -261,17 +301,17 @@ public class TestAsyncHBaseSink {
Assert.assertEquals(3, found);
out = results[3];
Assert.assertArrayEquals(Longs.toByteArray(3), out);
- testUtility.deleteTable(tableName.getBytes());
}
@Test
public void testWithoutConfigurationObject() throws Exception{
testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+ deleteTable = true;
ctx.put("batchSize", "2");
ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM,
testUtility.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
- testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+ testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
AsyncHBaseSink sink = new AsyncHBaseSink();
Configurables.configure(sink, ctx);
// Reset context to values usable by other tests.
@@ -279,7 +319,7 @@ public class TestAsyncHBaseSink {
ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,null);
ctx.put("batchSize", "100");
Channel channel = new MemoryChannel();
- Configurables.configure(channel, new Context());
+ Configurables.configure(channel, ctx);
sink.setChannel(channel);
sink.start();
Transaction tx = channel.getTransaction();
@@ -318,18 +358,18 @@ public class TestAsyncHBaseSink {
Assert.assertEquals(3, found);
out = results[3];
Assert.assertArrayEquals(Longs.toByteArray(3), out);
- testUtility.deleteTable(tableName.getBytes());
}
@Test(expected = FlumeException.class)
public void testMissingTable() throws Exception {
+ deleteTable = false;
ctx.put("batchSize", "2");
AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration());
Configurables.configure(sink, ctx);
//Reset the context to a higher batchSize
ctx.put("batchSize", "100");
Channel channel = new MemoryChannel();
- Configurables.configure(channel, new Context());
+ Configurables.configure(channel, ctx);
sink.setChannel(channel);
sink.start();
Transaction tx = channel.getTransaction();
@@ -373,12 +413,13 @@ public class TestAsyncHBaseSink {
public void testHBaseFailure() throws Exception {
ctx.put("batchSize", "2");
testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+ deleteTable = false;
AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration());
Configurables.configure(sink, ctx);
//Reset the context to a higher batchSize
ctx.put("batchSize", "100");
Channel channel = new MemoryChannel();
- Configurables.configure(channel, new Context());
+ Configurables.configure(channel, ctx);
sink.setChannel(channel);
sink.start();
Transaction tx = channel.getTransaction();