You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/10/04 02:27:51 UTC
git commit: FLUME-2202. AsyncHBaseSink should coalesce increments to
reduce RPC roundtrips
Updated Branches:
refs/heads/trunk b84d01615 -> c4e2129fd
FLUME-2202. AsyncHBaseSink should coalesce increments to reduce RPC roundtrips
(Hari Shreedharan via Mike Percy)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c4e2129f
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c4e2129f
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c4e2129f
Branch: refs/heads/trunk
Commit: c4e2129fd12f97303a1b8120a2ecf7da456e1b77
Parents: b84d016
Author: Mike Percy <mp...@cloudera.com>
Authored: Thu Oct 3 17:25:57 2013 -0700
Committer: Mike Percy <mp...@cloudera.com>
Committed: Thu Oct 3 17:25:57 2013 -0700
----------------------------------------------------------------------
flume-ng-doc/sphinx/FlumeUserGuide.rst | 38 +++---
.../apache/flume/sink/hbase/AsyncHBaseSink.java | 121 +++++++++++++++++--
.../hbase/HBaseSinkConfigurationConstants.java | 4 +
.../hbase/IncrementAsyncHBaseSerializer.java | 78 ++++++++++++
.../flume/sink/hbase/TestAsyncHBaseSink.java | 82 ++++++++++++-
5 files changed, 291 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/c4e2129f/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 5a59b56..dc8d05d 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1850,30 +1850,32 @@ AsyncHBaseSink
''''''''''''''
This sink writes data to HBase using an asynchronous model. A class implementing
-AsyncHbaseEventSerializer
-which is specified by the configuration is used to convert the events into
+AsyncHbaseEventSerializer which is specified by the configuration is used to convert the events into
HBase puts and/or increments. These puts and increments are then written
-to HBase. This sink provides the same consistency guarantees as HBase,
+to HBase. This sink uses the `Asynchbase API <https://github.com/OpenTSDB/asynchbase>`_ to write to
+HBase. This sink provides the same consistency guarantees as HBase,
which is currently row-wise atomicity. In the event of Hbase failing to
write certain events, the sink will replay all events in that transaction.
The type is the FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink.
Required properties are in **bold**.
-================ ============================================================ ====================================================================================
-Property Name Default Description
-================ ============================================================ ====================================================================================
-**channel** --
-**type** -- The component type name, needs to be ``asynchbase``
-**table** -- The name of the table in Hbase to write to.
-zookeeperQuorum -- The quorum spec. This is the value for the property ``hbase.zookeeper.quorum`` in hbase-site.xml
-znodeParent /hbase The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml
-**columnFamily** -- The column family in Hbase to write to.
-batchSize 100 Number of events to be written per txn.
-timeout 60000 The length of time (in milliseconds) the sink waits for acks from hbase for
- all events in a transaction.
-serializer org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
-serializer.* -- Properties to be passed to the serializer.
-================ ============================================================ ====================================================================================
+=================== ============================================================ ====================================================================================
+Property Name Default Description
+=================== ============================================================ ====================================================================================
+**channel** --
+**type** -- The component type name, needs to be ``asynchbase``
+**table** -- The name of the table in Hbase to write to.
+zookeeperQuorum -- The quorum spec. This is the value for the property ``hbase.zookeeper.quorum`` in hbase-site.xml
+znodeParent /hbase The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml
+**columnFamily** -- The column family in Hbase to write to.
+batchSize 100 Number of events to be written per txn.
+coalesceIncrements false Should the sink coalesce multiple increments to a cell per batch. This might give
+ better performance if there are multiple increments to a limited number of cells.
+timeout 60000 The length of time (in milliseconds) the sink waits for acks from hbase for
+ all events in a transaction.
+serializer org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
+serializer.* -- Properties to be passed to the serializer.
+=================== ============================================================ ====================================================================================
Note that this sink takes the Zookeeper Quorum and parent znode information in
the configuration. Zookeeper Quorum and parent node configuration may be
http://git-wip-us.apache.org/repos/asf/flume/blob/c4e2129f/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index 5e297b1..0545554 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -18,13 +18,19 @@
*/
package org.apache.flume.sink.hbase;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.UnsignedBytes;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flume.Channel;
import org.apache.flume.Context;
@@ -113,20 +119,32 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
private String zkQuorum;
private String zkBaseDir;
private ExecutorService sinkCallbackPool;
- private boolean isTest;
+ private boolean isTimeoutTest;
+ private boolean isCoalesceTest;
private boolean enableWal = true;
+ private boolean batchIncrements = false;
+ private volatile int totalCallbacksReceived = 0;
+ private Map<CellIdentifier, AtomicIncrementRequest> incrementBuffer;
+
+ // Does not need to be thread-safe. Always called only from the sink's
+ // process method.
+ private final Comparator<byte[]> COMPARATOR = UnsignedBytes
+ .lexicographicalComparator();
public AsyncHBaseSink(){
this(null);
}
public AsyncHBaseSink(Configuration conf) {
- this(conf, false);
+ this(conf, false, false);
}
- AsyncHBaseSink(Configuration conf, boolean isTimeoutTesting) {
+ @VisibleForTesting
+ AsyncHBaseSink(Configuration conf, boolean isTimeoutTest,
+ boolean isCoalesceTest) {
this.conf = conf;
- isTest = isTimeoutTesting;
+ this.isTimeoutTest = isTimeoutTest;
+ this.isCoalesceTest = isCoalesceTest;
}
@Override
@@ -138,7 +156,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
* the next one is being processed.
*
*/
- if(!open){
+ if (!open) {
throw new EventDeliveryException("Sink was never opened. " +
"Please fix the configuration.");
}
@@ -147,6 +165,9 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
AtomicInteger callbacksExpected = new AtomicInteger(0);
final Lock lock = new ReentrantLock();
final Condition condition = lock.newCondition();
+ if (incrementBuffer != null) {
+ incrementBuffer.clear();
+ }
/*
* Callbacks can be reused per transaction, since they share the same
* locks and conditions.
@@ -185,18 +206,41 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
serializer.setEvent(event);
List<PutRequest> actions = serializer.getActions();
List<AtomicIncrementRequest> increments = serializer.getIncrements();
- callbacksExpected.addAndGet(actions.size() + increments.size());
+ callbacksExpected.addAndGet(actions.size());
+ if (!batchIncrements) {
+ callbacksExpected.addAndGet(increments.size());
+ }
for (PutRequest action : actions) {
action.setDurable(enableWal);
client.put(action).addCallbacks(putSuccessCallback, putFailureCallback);
}
for (AtomicIncrementRequest increment : increments) {
- client.atomicIncrement(increment).addCallbacks(
- incrementSuccessCallback, incrementFailureCallback);
+ if (batchIncrements) {
+ CellIdentifier identifier = new CellIdentifier(increment.key(),
+ increment.qualifier());
+ AtomicIncrementRequest request
+ = incrementBuffer.get(identifier);
+ if (request == null) {
+ incrementBuffer.put(identifier, increment);
+ } else {
+ request.setAmount(request.getAmount() + increment.getAmount());
+ }
+ } else {
+ client.atomicIncrement(increment).addCallbacks(
+ incrementSuccessCallback, incrementFailureCallback);
+ }
}
}
}
+ if (batchIncrements) {
+ Collection<AtomicIncrementRequest> increments = incrementBuffer.values();
+ for (AtomicIncrementRequest increment : increments) {
+ client.atomicIncrement(increment).addCallbacks(
+ incrementSuccessCallback, incrementFailureCallback);
+ }
+ callbacksExpected.addAndGet(increments.size());
+ }
client.flush();
} catch (Throwable e) {
this.handleTransactionFailure(txn);
@@ -216,7 +260,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
timeRemaining = timeout - (System.nanoTime() - startTime);
timeRemaining = (timeRemaining >= 0) ? timeRemaining : 0;
try {
- if(!condition.await(timeRemaining, TimeUnit.NANOSECONDS)){
+ if (!condition.await(timeRemaining, TimeUnit.NANOSECONDS)) {
txnFail.set(true);
logger.warn("HBase callbacks timed out. "
+ "Transaction will be rolled back.");
@@ -231,6 +275,10 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
lock.unlock();
}
+ if (isCoalesceTest) {
+ totalCallbacksReceived += callbacksReceived.get();
+ }
+
/*
* At this point, either the txn has failed
* or all callbacks received and txn is successful.
@@ -246,7 +294,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
throw new EventDeliveryException("Could not write events to Hbase. " +
"Transaction failed, and rolled back.");
} else {
- try{
+ try {
txn.commit();
txn.close();
sinkCounter.addToEventDrainSuccessCount(i);
@@ -334,6 +382,21 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
"All writes to HBase will have WAL disabled, and any data in the " +
"memstore of this region in the Region Server could be lost!");
}
+
+ batchIncrements = context.getBoolean(
+ HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
+ HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS);
+
+ if(batchIncrements) {
+ incrementBuffer = Maps.newHashMap();
+ logger.info("Increment coalescing is enabled. Increments will be " +
+ "buffered.");
+ }
+ }
+
+ @VisibleForTesting
+ int getTotalCallbacksReceived() {
+ return totalCallbacksReceived;
}
@VisibleForTesting
@@ -346,7 +409,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
+ "before calling start on an old instance.");
sinkCounter.start();
sinkCounter.incrementConnectionCreatedCount();
- if (!isTest) {
+ if (!isTimeoutTest) {
sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setNameFormat(this.getName() + " HBase Call Pool").build());
} else {
@@ -447,7 +510,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
lock = lck;
this.callbacksReceived = callbacksReceived;
this.condition = condition;
- isTimeoutTesting = isTest;
+ isTimeoutTesting = isTimeoutTest;
}
@Override
@@ -487,7 +550,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
this.callbacksReceived = callbacksReceived;
this.txnFail = txnFail;
this.condition = condition;
- isTimeoutTesting = isTest;
+ isTimeoutTesting = isTimeoutTest;
}
@Override
@@ -525,4 +588,36 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
}
throw new EventDeliveryException("Error in processing transaction.", e);
}
+
+ private class CellIdentifier {
+ private final byte[] row;
+ private final byte[] column;
+ private final int hashCode;
+ // Since the sink operates only on one table and one cf,
+ // we use the data from the owning sink
+ public CellIdentifier(byte[] row, byte[] column) {
+ this.row = row;
+ this.column = column;
+ this.hashCode =
+ (Arrays.hashCode(row) * 31) * (Arrays.hashCode(column) * 31);
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ // Since we know that this class is used from only this class,
+ // skip the class comparison to save time
+ @Override
+ public boolean equals(Object other) {
+ CellIdentifier o = (CellIdentifier) other;
+ if (other == null) {
+ return false;
+ } else {
+ return (COMPARATOR.compare(row, o.row) == 0
+ && COMPARATOR.compare(column, o.column) == 0);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flume/blob/c4e2129f/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
index 7fdc75b..1a78071 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
@@ -66,4 +66,8 @@ public class HBaseSinkConfigurationConstants {
public static final String DEFAULT_ZK_ZNODE_PARENT =
HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
+ public static final String CONFIG_COALESCE_INCREMENTS = "coalesceIncrements";
+
+ public static final Boolean DEFAULT_COALESCE_INCREMENTS = false;
+
}
http://git-wip-us.apache.org/repos/asf/flume/blob/c4e2129f/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java
new file mode 100644
index 0000000..b8aefe8
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hbase;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.hbase.async.AtomicIncrementRequest;
+import org.hbase.async.PutRequest;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * An AsyncHBaseEventSerializer implementation that increments a configured
+ * column for the row whose row key is the event's body bytes.
+ */
+public class IncrementAsyncHBaseSerializer implements AsyncHbaseEventSerializer {
+ private byte[] table;
+ private byte[] cf;
+ private byte[] column;
+ private Event currentEvent;
+ @Override
+ public void initialize(byte[] table, byte[] cf) {
+ this.table = table;
+ this.cf = cf;
+ }
+
+ @Override
+ public void setEvent(Event event) {
+ this.currentEvent = event;
+ }
+
+ @Override
+ public List<PutRequest> getActions() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<AtomicIncrementRequest> getIncrements() {
+ List<AtomicIncrementRequest> incrs
+ = new ArrayList<AtomicIncrementRequest>();
+ AtomicIncrementRequest incr = new AtomicIncrementRequest(table,
+ currentEvent.getBody(), cf, column, 1);
+ incrs.add(incr);
+ return incrs;
+ }
+
+ @Override
+ public void cleanUp() {
+ }
+
+ @Override
+ public void configure(Context context) {
+ column = context.getString("column", "col").getBytes();
+ }
+
+ @Override
+ public void configure(ComponentConfiguration conf) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/c4e2129f/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
index a0c04eb..ccbc086 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
@@ -204,7 +204,7 @@ public class TestAsyncHBaseSink {
testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
deleteTable = true;
AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(),
- true);
+ true, false);
Configurables.configure(sink, ctx);
Channel channel = new MemoryChannel();
Configurables.configure(channel, ctx);
@@ -271,6 +271,86 @@ public class TestAsyncHBaseSink {
}
@Test
+ public void testMultipleBatchesBatchIncrementsWithCoalescing()
+ throws Exception {
+ doTestMultipleBatchesBatchIncrements(true);
+ }
+
+ @Test
+ public void testMultipleBatchesBatchIncrementsNoCoalescing()
+ throws Exception {
+ doTestMultipleBatchesBatchIncrements(false);
+ }
+
+ public void doTestMultipleBatchesBatchIncrements(boolean coalesce) throws
+ Exception {
+ testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+ deleteTable = true;
+ AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(),
+ false, true);
+ if (coalesce) {
+ ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
+ "true");
+ }
+ ctx.put("batchSize", "2");
+ ctx.put("serializer", IncrementAsyncHBaseSerializer.class.getName());
+ ctx.put("serializer.column", "test");
+ Configurables.configure(sink, ctx);
+ //Reset the context to a higher batchSize
+ ctx.put("batchSize", "100");
+ // Restore the original serializer
+ ctx.put("serializer", SimpleAsyncHbaseEventSerializer.class.getName());
+ //Restore the no coalescing behavior
+ ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
+ "false");
+ Channel channel = new MemoryChannel();
+ Configurables.configure(channel, ctx);
+ sink.setChannel(channel);
+ sink.start();
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ for (int i = 0; i < 4; i++) {
+ for (int j = 0; j < 3; j++) {
+ Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
+ channel.put(e);
+ }
+ }
+ tx.commit();
+ tx.close();
+ int count = 0;
+ Status status = Status.READY;
+ while (status != Status.BACKOFF) {
+ count++;
+ status = sink.process();
+ }
+ Assert.assertFalse(sink.isConfNull());
+ sink.stop();
+ Assert.assertEquals(7, count);
+ HTable table = new HTable(testUtility.getConfiguration(), tableName);
+ Scan scan = new Scan();
+ scan.addColumn(columnFamily.getBytes(),"test".getBytes());
+ scan.setStartRow(Bytes.toBytes(valBase));
+ ResultScanner rs = table.getScanner(scan);
+ int i = 0;
+ try {
+ for (Result r = rs.next(); r != null; r = rs.next()) {
+ byte[] out = r.getValue(columnFamily.getBytes(), "test".getBytes());
+ Assert.assertArrayEquals(Longs.toByteArray(3), out);
+ Assert.assertTrue(new String(r.getRow()).startsWith(valBase));
+ i++;
+ }
+ } finally {
+ rs.close();
+ }
+ Assert.assertEquals(4, i);
+ if (coalesce) {
+ Assert.assertEquals(8, sink.getTotalCallbacksReceived());
+ } else {
+ Assert.assertEquals(12, sink.getTotalCallbacksReceived());
+ }
+ }
+
+ @Test
public void testWithoutConfigurationObject() throws Exception{
testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
deleteTable = true;