You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2015/01/21 22:50:06 UTC
flume git commit: FLUME-2594: Close Async HBase Client if there are
large number of consecutive timeouts
Repository: flume
Updated Branches:
refs/heads/trunk 91c58804d -> 82631f811
FLUME-2594: Close Async HBase Client if there are large number of consecutive timeouts
(Hari Shreedharan via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/82631f81
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/82631f81
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/82631f81
Branch: refs/heads/trunk
Commit: 82631f811a7c6a4d8e6ec886f80d1a21876947e5
Parents: 91c5880
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Jan 21 13:49:41 2015 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Jan 21 13:49:41 2015 -0800
----------------------------------------------------------------------
.../apache/flume/sink/hbase/AsyncHBaseSink.java | 74 ++++++++++++++++++--
1 file changed, 69 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/82631f81/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index 1666be4..1d05189 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -127,12 +127,21 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
private boolean batchIncrements = false;
private volatile int totalCallbacksReceived = 0;
private Map<CellIdentifier, AtomicIncrementRequest> incrementBuffer;
+ // The HBaseClient buffers the requests until a callback is received. In the event of a
+ // timeout, there is no way to clear these buffers. If there is a major cluster issue, this
+ // buffer can become too big and cause crashes. So if we hit a fixed number of HBase write
+ // failures/timeouts, then close the HBase Client (gracefully or not) and force a GC to get rid
+ // of the buffered data.
+ private int consecutiveHBaseFailures = 0;
+ private boolean lastTxnFailed = false;
// Does not need to be thread-safe. Always called only from the sink's
// process method.
private final Comparator<byte[]> COMPARATOR = UnsignedBytes
.lexicographicalComparator();
+ private static final int MAX_CONSECUTIVE_FAILS = 10;
+
public AsyncHBaseSink(){
this(null);
}
@@ -162,6 +171,12 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
throw new EventDeliveryException("Sink was never opened. " +
"Please fix the configuration.");
}
+ if (client == null) {
+ client = initHBaseClient();
+ if (client == null) {
+ throw new EventDeliveryException("Could not establish connection to HBase!");
+ }
+ }
AtomicBoolean txnFail = new AtomicBoolean(false);
AtomicInteger callbacksReceived = new AtomicInteger(0);
AtomicInteger callbacksExpected = new AtomicInteger(0);
@@ -292,11 +307,19 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
*
*/
if (txnFail.get()) {
+ // We enter this if condition only if the failure was due to HBase failure, so we make sure
+ // we track the consecutive failures.
+ if (lastTxnFailed) {
+ consecutiveHBaseFailures++;
+ }
+ lastTxnFailed = true;
this.handleTransactionFailure(txn);
throw new EventDeliveryException("Could not write events to Hbase. " +
"Transaction failed, and rolled back.");
} else {
try {
+ lastTxnFailed = false;
+ consecutiveHBaseFailures = 0;
txn.commit();
txn.close();
sinkCounter.addToEventDrainSuccessCount(i);
@@ -414,7 +437,12 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setNameFormat(this.getName() + " HBase Call Pool").build());
logger.info("Callback pool created");
- if(!isTimeoutTest) {
+ client = initHBaseClient();
+ super.start();
+ }
+
+ private HBaseClient initHBaseClient() {
+ if (!isTimeoutTest) {
client = new HBaseClient(zkQuorum, zkBaseDir, sinkCallbackPool);
} else {
client = new HBaseClient(zkQuorum, zkBaseDir,
@@ -454,8 +482,9 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
}
if(fail.get()){
sinkCounter.incrementConnectionFailedCount();
- client.shutdown();
- client = null;
+ if (client != null) {
+ shutdownHBaseClient();
+ }
throw new FlumeException(
"Could not start sink. " +
"Table or column family does not exist in Hbase.");
@@ -463,14 +492,14 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
open = true;
}
client.setFlushInterval((short) 0);
- super.start();
+ return client;
}
@Override
public void stop(){
serializer.cleanUp();
if (client != null) {
- client.shutdown();
+ shutdownHBaseClient();
}
sinkCounter.incrementConnectionClosedCount();
sinkCounter.stop();
@@ -496,8 +525,43 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
super.stop();
}
+ private void shutdownHBaseClient() {
+ final CountDownLatch waiter = new CountDownLatch(1);
+ try {
+ client.shutdown().addCallback(new Callback<Object, Object>() {
+ @Override
+ public Object call(Object arg) throws Exception {
+ waiter.countDown();
+ return null;
+ }
+ }).addErrback(new Callback<Object, Object>() {
+ @Override
+ public Object call(Object arg) throws Exception {
+ logger.error("Failed to shutdown HBase client cleanly! HBase cluster might be down");
+ waiter.countDown();
+ return null;
+ }
+ });
+ if (!waiter.await(timeout, TimeUnit.NANOSECONDS)) {
+ logger.error("HBase connection could not be closed within timeout! HBase cluster might " +
+ "be down!");
+ }
+ } catch (Exception ex) {
+ logger.warn("Error while attempting to close connections to HBase");
+ } finally {
+ // Dereference the client to force GC to clear up any buffered requests.
+ client = null;
+ }
+ }
+
private void handleTransactionFailure(Transaction txn)
throws EventDeliveryException {
+ if (consecutiveHBaseFailures >= MAX_CONSECUTIVE_FAILS) {
+ if (client != null) {
+ shutdownHBaseClient();
+ }
+ consecutiveHBaseFailures = 0;
+ }
try {
txn.rollback();
} catch (Throwable e) {