You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2012/07/27 18:48:43 UTC
svn commit: r1366445 -
/flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
Author: jarcec
Date: Fri Jul 27 16:48:42 2012
New Revision: 1366445
URL: http://svn.apache.org/viewvc?rev=1366445&view=rev
Log:
FLUME-1398. Improve concurrency for async hbase sink.
(Hari Shreedharan via Jarek Jarcec Cecho)
Modified:
flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
Modified: flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java?rev=1366445&r1=1366444&r2=1366445&view=diff
==============================================================================
--- flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java (original)
+++ flume/trunk/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java Fri Jul 27 16:48:42 2012
@@ -18,14 +18,12 @@
*/
package org.apache.flume.sink.hbase;
-import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flume.Channel;
import org.apache.flume.Context;
-import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
@@ -45,6 +43,11 @@ import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.stumbleupon.async.Callback;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.flume.instrumentation.SinkCounter;
/**
*
@@ -86,8 +89,8 @@ public class AsyncHBaseSink extends Abst
private String tableName;
private byte[] columnFamily;
private long batchSize;
- private CounterGroup counterGroup = new CounterGroup();
- private static final Logger logger = LoggerFactory.getLogger(HBaseSink.class);
+ private static final Logger logger =
+ LoggerFactory.getLogger(AsyncHBaseSink.class);
private AsyncHbaseEventSerializer serializer;
private String eventSerializerType;
private Context serializerContext;
@@ -95,6 +98,7 @@ public class AsyncHBaseSink extends Abst
private Configuration conf;
private Transaction txn;
private volatile boolean open = false;
+ private SinkCounter sinkCounter;
public AsyncHBaseSink(){
conf = HBaseConfiguration.create();
@@ -117,48 +121,83 @@ public class AsyncHBaseSink extends Abst
"Please fix the configuration.");
}
AtomicBoolean txnFail = new AtomicBoolean(false);
+ AtomicInteger callbacksReceived = new AtomicInteger(0);
+ AtomicInteger callbacksExpected = new AtomicInteger(0);
+ final Lock lock = new ReentrantLock();
+ final Condition condition = lock.newCondition();
+ /*
+ * Callbacks can be reused per transaction, since they share the same
+ * locks and conditions.
+ */
+ Callback<Object, Object> putSuccessCallback =
+ new SuccessCallback<Object, Object>(
+ lock, callbacksReceived, condition);
+ Callback<Object, Object> putFailureCallback =
+ new FailureCallback<Object, Object>(
+ lock, callbacksReceived, txnFail, condition);
+
+ Callback<Long, Long> incrementSuccessCallback =
+ new SuccessCallback<Long, Long>(
+ lock, callbacksReceived, condition);
+ Callback<Long, Long> incrementFailureCallback =
+ new FailureCallback<Long, Long>(
+ lock, callbacksReceived, txnFail, condition);
+
Status status = Status.READY;
Channel channel = getChannel();
txn = channel.getTransaction();
txn.begin();
- List<PutRequest> actions = new LinkedList<PutRequest>();
- List<AtomicIncrementRequest> increments =
- new LinkedList<AtomicIncrementRequest>();
- for(int i = 0; i < batchSize; i++){
+ int i = 0;
+ for (; i < batchSize; i++) {
Event event = channel.take();
- if(event == null){
+ if (event == null) {
status = Status.BACKOFF;
- counterGroup.incrementAndGet("channel.underflow");
+ if (i == 0) {
+ sinkCounter.incrementBatchEmptyCount();
+ } else {
+ sinkCounter.incrementBatchUnderflowCount();
+ }
break;
} else {
serializer.setEvent(event);
- actions.addAll(serializer.getActions());
- increments.addAll(serializer.getIncrements());
+ List<PutRequest> actions = serializer.getActions();
+ List<AtomicIncrementRequest> increments = serializer.getIncrements();
+ callbacksExpected.addAndGet(actions.size() + increments.size());
+
+ for (PutRequest action : actions) {
+ client.put(action).addCallbacks(putSuccessCallback, putFailureCallback);
+ }
+ for (AtomicIncrementRequest increment : increments) {
+ client.atomicIncrement(increment).addCallbacks(
+ incrementSuccessCallback, incrementFailureCallback);
+ }
}
}
- CountDownLatch latch =
- new CountDownLatch(actions.size() + increments.size());
- for(PutRequest action : actions) {
- Callback<Object, Object> callback =
- new SuccessCallback<Object, Object>(latch);
- Callback<Object, Object> errback =
- new ErrBack<Object, Object>(latch, txnFail);
- client.put(action).addCallbacks(callback, errback);
- }
- for(AtomicIncrementRequest increment : increments) {
- Callback<Long, Long> callback =
- new SuccessCallback<Long, Long>(latch);
- Callback<Long, Long> errback = new ErrBack<Long, Long>(latch, txnFail);
- client.atomicIncrement(increment).addCallbacks(callback, errback);
+ if(i == batchSize) {
+ sinkCounter.incrementBatchCompleteCount();
}
+ sinkCounter.addToEventDrainAttemptCount(i);
+ lock.lock();
try {
- latch.await();
- } catch (InterruptedException e1) {
- this.handleTransactionFailure(txn);
- throw new EventDeliveryException("Sink interrupted while waiting" +
- "for Hbase callbacks. Exception follows.", e1);
+ while ((callbacksReceived.get() < callbacksExpected.get())
+ && !txnFail.get()) {
+ try {
+ condition.await();
+ } catch (InterruptedException ex) {
+ logger.error("Interrupted while waiting for callbacks from HBase.");
+ try {
+ txn.rollback();
+ } finally {
+ txn.close();
+ }
+ Throwables.propagate(ex);
+ }
+ }
+ } finally {
+ lock.unlock();
}
+
/*
* At this point, either the txn has failed
* or all callbacks received and txn is successful.
@@ -176,6 +215,7 @@ public class AsyncHBaseSink extends Abst
} else {
try{
txn.commit();
+ sinkCounter.addToEventDrainSuccessCount(i);
} catch (Throwable e) {
try{
txn.rollback();
@@ -183,7 +223,6 @@ public class AsyncHBaseSink extends Abst
logger.error("Exception in rollback. Rollback might not have been" +
"successful." , e2);
}
- counterGroup.incrementAndGet("transaction.rollback");
logger.error("Failed to commit transaction." +
"Transaction rolled back.", e);
if(e instanceof Error || e instanceof RuntimeException){
@@ -240,11 +279,17 @@ public class AsyncHBaseSink extends Abst
logger.error("Could not instantiate event serializer." , e);
Throwables.propagate(e);
}
+
+ if(sinkCounter == null) {
+ sinkCounter = new SinkCounter(this.getName());
+ }
}
@Override
public void start(){
- Preconditions.checkArgument(client == null, "Please call stop " +
- "before calling start on an old instance.");
+ Preconditions.checkArgument(client == null, "Please call stop "
+ + "before calling start on an old instance.");
+ sinkCounter.start();
+ sinkCounter.incrementConnectionCreatedCount();
String zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
String zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
if(zkBaseDir != null){
@@ -252,19 +297,35 @@ public class AsyncHBaseSink extends Abst
} else {
client = new HBaseClient(zkQuorum);
}
- CountDownLatch latch = new CountDownLatch(1);
- AtomicBoolean fail = new AtomicBoolean(false);
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicBoolean fail = new AtomicBoolean(false);
client.ensureTableFamilyExists(
- tableName.getBytes(Charsets.UTF_8), columnFamily).addCallbacks(
- new SuccessCallback<Object, Object>(latch) ,
- new ErrBack<Object, Object>(latch, fail));
+ tableName.getBytes(Charsets.UTF_8), columnFamily).addCallbacks(
+ new Callback<Object, Object>() {
+ @Override
+ public Object call(Object arg) throws Exception {
+ latch.countDown();
+ return null;
+ }
+ },
+ new Callback<Object, Object>() {
+ @Override
+ public Object call(Object arg) throws Exception {
+ fail.set(true);
+ latch.countDown();
+ return null;
+ }
+ });
+
try {
latch.await();
} catch (InterruptedException e) {
+ sinkCounter.incrementConnectionFailedCount();
throw new FlumeException(
"Interrupted while waiting for Hbase Callbacks", e);
}
if(fail.get()){
+ sinkCounter.incrementConnectionFailedCount();
throw new FlumeException(
"Could not start sink. " +
"Table or column family does not exist in Hbase.");
@@ -279,8 +340,11 @@ public class AsyncHBaseSink extends Abst
public void stop(){
serializer.cleanUp();
client.shutdown();
+ sinkCounter.incrementConnectionClosedCount();
+ sinkCounter.stop();
client = null;
open = false;
+ super.stop();
}
private void handleTransactionFailure(Transaction txn)
@@ -288,7 +352,6 @@ public class AsyncHBaseSink extends Abst
try {
txn.rollback();
} catch (Throwable e) {
- counterGroup.incrementAndGet("transaction.rollback");
logger.error("Failed to commit transaction." +
"Transaction rolled back.", e);
if(e instanceof Error || e instanceof RuntimeException){
@@ -305,53 +368,55 @@ public class AsyncHBaseSink extends Abst
txn.close();
}
}
-
- private class SuccessCallback<R, T> implements Callback<R, T> {
-
- private CountDownLatch latch;
- public SuccessCallback(CountDownLatch latch){
- this.latch = latch;
+ private class SuccessCallback<R,T> implements Callback<R,T> {
+ private Lock lock;
+ private AtomicInteger callbacksReceived;
+ private Condition condition;
+
+ public SuccessCallback(Lock lck, AtomicInteger callbacksReceived,
+ Condition condition) {
+ lock = lck;
+ this.callbacksReceived = callbacksReceived;
+ this.condition = condition;
}
-
@Override
- public R call(T arg0) throws Exception {
- latch.countDown();
+ public R call(T arg) throws Exception {
+ callbacksReceived.incrementAndGet();
+ lock.lock();
+ try{
+ condition.signal();
+ } finally {
+ lock.unlock();
+ }
return null;
}
}
- private class ErrBack<R, T> implements Callback<R, T> {
-
- private CountDownLatch latch;
- /*
- * Reference to the boolean representing failure of the current transaction.
- * Since each txn gets a new boolean, failure of one txn will not affect
- * the next even if errbacks for the current txn come while the next one is
- * being processed.
- *
- */
+ private class FailureCallback<R,T> implements Callback<R,T> {
+ private Lock lock;
+ private AtomicInteger callbacksReceived;
private AtomicBoolean txnFail;
- public ErrBack(CountDownLatch latch, AtomicBoolean txnFail){
- this.latch = latch;
+ private Condition condition;
+
+ public FailureCallback(Lock lck, AtomicInteger callbacksReceived,
+ AtomicBoolean txnFail, Condition condition){
+ this.lock = lck;
+ this.callbacksReceived = callbacksReceived;
this.txnFail = txnFail;
+ this.condition = condition;
}
@Override
- public R call(T arg0) throws Exception {
- /*
- * getCount() and countDown are thread safe. countDown will not let
- * count to go < 0 anyway.
- * So even if multiple threads call this method simultaneously,
- * it is ok - eventually one will call countDown and set count to 0,
- * then all countDown calls are simply no-ops anyway, and the
- * process thread is released at count == 0.
- */
- txnFail.set(true);
- while(latch.getCount() > 0 ) {
- latch.countDown();
+ public R call(T arg) throws Exception {
+ callbacksReceived.incrementAndGet();
+ this.txnFail.set(true);
+ lock.lock();
+ try {
+ condition.signal();
+ } finally {
+ lock.unlock();
}
return null;
}
-
}
}