You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/03/03 08:50:20 UTC
git commit: FLUME-2338. Support coalescing increments in HBaseSink.
Repository: flume
Updated Branches:
refs/heads/trunk a6a6c4c2a -> 674f4fcce
FLUME-2338. Support coalescing increments in HBaseSink.
(Mike Percy via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/674f4fcc
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/674f4fcc
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/674f4fcc
Branch: refs/heads/trunk
Commit: 674f4fcce2597e7e934ccc69eb04b426f5a9b8bb
Parents: a6a6c4c
Author: Hari Shreedharan <hs...@apache.org>
Authored: Sun Mar 2 23:49:17 2014 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Sun Mar 2 23:49:17 2014 -0800
----------------------------------------------------------------------
flume-ng-doc/sphinx/FlumeUserGuide.rst | 2 +
.../org/apache/flume/sink/hbase/BatchAware.java | 28 ++
.../org/apache/flume/sink/hbase/HBaseSink.java | 195 +++++++++++-
.../sink/hbase/IncrementHBaseSerializer.java | 80 +++++
.../apache/flume/sink/hbase/TestHBaseSink.java | 298 +++++++++++++++----
5 files changed, 550 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/674f4fcc/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 96bf73e..cedb283 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1839,6 +1839,8 @@ Property Name Default Desc
zookeeperQuorum -- The quorum spec. This is the value for the property ``hbase.zookeeper.quorum`` in hbase-site.xml
znodeParent /hbase The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml
batchSize 100 Number of events to be written per txn.
+coalesceIncrements false Should the sink coalesce multiple increments to a cell per batch. This might give
+ better performance if there are multiple increments to a limited number of cells.
serializer org.apache.flume.sink.hbase.SimpleHbaseEventSerializer Default increment column = "iCol", payload column = "pCol".
serializer.* -- Properties to be passed to the serializer.
kerberosPrincipal -- Kerberos user principal for accessing secure HBase
http://git-wip-us.apache.org/repos/asf/flume/blob/674f4fcc/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java
new file mode 100644
index 0000000..0974241
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hbase;
+
+/**
+ * This interface allows for implementing HBase serializers that are aware of
+ * batching. {@link #onBatchStart()} is called at the beginning of each batch
+ * by the sink.
+ */
+public interface BatchAware {
+ public void onBatchStart();
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/674f4fcc/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
index c4a666c..0390ff8 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
@@ -19,15 +19,23 @@
package org.apache.flume.sink.hbase;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Transaction;
+import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
@@ -52,7 +60,7 @@ import org.apache.hadoop.hbase.security.User;
/**
*
* A simple sink which reads events from a channel and writes them to HBase.
- * The Hbase configution is picked up from the first <tt>hbase-site.xml</tt>
+ * The Hbase configuration is picked up from the first <tt>hbase-site.xml</tt>
* encountered in the classpath. This sink supports batch reading of
* events from the channel, and writing them to Hbase, to minimize the number
* of flushes on the hbase tables. To use this sink, it has to be configured
@@ -97,8 +105,13 @@ public class HBaseSink extends AbstractSink implements Configurable {
private String kerberosKeytab;
private User hbaseUser;
private boolean enableWal = true;
+ private boolean batchIncrements = false;
+ private Method refGetFamilyMap;
private SinkCounter sinkCounter;
+ // Internal hooks used for unit testing.
+ private DebugIncrementsCallback debugIncrCallback = null;
+
public HBaseSink(){
this(HBaseConfiguration.create());
}
@@ -107,6 +120,13 @@ public class HBaseSink extends AbstractSink implements Configurable {
this.config = conf;
}
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ HBaseSink(Configuration conf, DebugIncrementsCallback cb) {
+ this(conf);
+ this.debugIncrCallback = cb;
+ }
+
@Override
public void start(){
Preconditions.checkArgument(table == null, "Please call stop " +
@@ -222,6 +242,17 @@ public class HBaseSink extends AbstractSink implements Configurable {
"writes to HBase will have WAL disabled, and any data in the " +
"memstore of this region in the Region Server could be lost!");
}
+
+ batchIncrements = context.getBoolean(
+ HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
+ HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS);
+
+ if (batchIncrements) {
+ logger.info("Increment coalescing is enabled. Increments will be " +
+ "buffered.");
+ reflectLookupGetFamilyMap();
+ }
+
String zkQuorum = context.getString(HBaseSinkConfigurationConstants
.ZK_QUORUM);
Integer port = null;
@@ -281,6 +312,11 @@ public class HBaseSink extends AbstractSink implements Configurable {
List<Increment> incs = new LinkedList<Increment>();
try {
txn.begin();
+
+ if (serializer instanceof BatchAware) {
+ ((BatchAware)serializer).onBatchStart();
+ }
+
long i = 0;
for (; i < batchSize; i++) {
Event event = channel.take();
@@ -309,7 +345,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
try{
txn.rollback();
} catch (Exception e2) {
- logger.error("Exception in rollback. Rollback might not have been" +
+ logger.error("Exception in rollback. Rollback might not have been " +
"successful." , e2);
}
logger.error("Failed to commit transaction." +
@@ -353,7 +389,20 @@ public class HBaseSink extends AbstractSink implements Configurable {
runPrivileged(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
- for (final Increment i : incs) {
+
+ List<Increment> processedIncrements;
+ if (batchIncrements) {
+ processedIncrements = coalesceIncrements(incs);
+ } else {
+ processedIncrements = incs;
+ }
+
+ // Only used for unit testing.
+ if (debugIncrCallback != null) {
+ debugIncrCallback.onAfterCoalesce(processedIncrements);
+ }
+
+ for (final Increment i : processedIncrements) {
i.setWriteToWAL(enableWal);
table.increment(i);
}
@@ -364,6 +413,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
txn.commit();
sinkCounter.addToEventDrainSuccessCount(actions.size());
}
+
private <T> T runPrivileged(final PrivilegedExceptionAction<T> action)
throws Exception {
if(hbaseUser != null) {
@@ -375,4 +425,143 @@ public class HBaseSink extends AbstractSink implements Configurable {
return action.run();
}
}
+
+ /**
+ * The method getFamilyMap() is no longer available in Hbase 0.96.
+ * We must use reflection to determine which version we may use.
+ */
+ private void reflectLookupGetFamilyMap() {
+ refGetFamilyMap = null;
+ String[] methodNames = { "getFamilyMap", "getFamilyMapOfLongs" };
+ for (String methodName : methodNames) {
+ try {
+ refGetFamilyMap = Increment.class.getMethod(methodName);
+ if (refGetFamilyMap != null) {
+ logger.debug("Using Increment.{} for coalesce", methodName);
+ break;
+ }
+ } catch (NoSuchMethodException e) {
+ logger.debug("Increment.{} does not exist. Exception follows.",
+ methodName, e);
+ } catch (SecurityException e) {
+ logger.debug("No access to Increment.{}; Exception follows.",
+ methodName, e);
+ }
+ }
+ if (refGetFamilyMap == null) {
+ throw new UnsupportedOperationException(
+ "Cannot find Increment.getFamilyMap()");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map<byte[], NavigableMap<byte[], Long>> getFamilyMap(Increment inc) {
+ Preconditions.checkNotNull(refGetFamilyMap,
+ "Increment.getFamilymap() not found");
+ Preconditions.checkNotNull(inc, "Increment required");
+ Map<byte[], NavigableMap<byte[], Long>> familyMap = null;
+ try {
+ Object familyObj = refGetFamilyMap.invoke(inc);
+ familyMap = (Map<byte[], NavigableMap<byte[], Long>>) familyObj;
+ } catch (IllegalAccessException e) {
+ logger.warn("Unexpected error calling getFamilyMap()", e);
+ Throwables.propagate(e);
+ } catch (InvocationTargetException e) {
+ logger.warn("Unexpected error calling getFamilyMap()", e);
+ Throwables.propagate(e);
+ }
+ return familyMap;
+ }
+
+ /**
+ * Perform "compression" on the given set of increments so that Flume sends
+ * the minimum possible number of RPC operations to HBase per batch.
+ * @param incs Input: Increment objects to coalesce.
+ * @return List of new Increment objects after coalescing the unique counts.
+ */
+ private List<Increment> coalesceIncrements(Iterable<Increment> incs) {
+ Preconditions.checkNotNull(incs, "List of Increments must not be null");
+ // Aggregate all of the increment row/family/column counts.
+ // The nested map is keyed like this: {row, family, qualifier} => count.
+ Map<byte[], Map<byte[], NavigableMap<byte[], Long>>> counters =
+ Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ for (Increment inc : incs) {
+ byte[] row = inc.getRow();
+ Map<byte[], NavigableMap<byte[], Long>> families = getFamilyMap(inc);
+ for (Map.Entry<byte[], NavigableMap<byte[],Long>> familyEntry : families.entrySet()) {
+ byte[] family = familyEntry.getKey();
+ NavigableMap<byte[], Long> qualifiers = familyEntry.getValue();
+ for (Map.Entry<byte[], Long> qualifierEntry : qualifiers.entrySet()) {
+ byte[] qualifier = qualifierEntry.getKey();
+ Long count = qualifierEntry.getValue();
+ incrementCounter(counters, row, family, qualifier, count);
+ }
+ }
+ }
+
+ // Reconstruct list of Increments per unique row/family/qualifier.
+ List<Increment> coalesced = Lists.newLinkedList();
+ for (Map.Entry<byte[], Map<byte[],NavigableMap<byte[], Long>>> rowEntry : counters.entrySet()) {
+ byte[] row = rowEntry.getKey();
+ Map <byte[], NavigableMap<byte[], Long>> families = rowEntry.getValue();
+ Increment inc = new Increment(row);
+ for (Map.Entry<byte[], NavigableMap<byte[], Long>> familyEntry : families.entrySet()) {
+ byte[] family = familyEntry.getKey();
+ NavigableMap<byte[], Long> qualifiers = familyEntry.getValue();
+ for (Map.Entry<byte[], Long> qualifierEntry : qualifiers.entrySet()) {
+ byte[] qualifier = qualifierEntry.getKey();
+ long count = qualifierEntry.getValue();
+ inc.addColumn(family, qualifier, count);
+ }
+ }
+ coalesced.add(inc);
+ }
+
+ return coalesced;
+ }
+
+ /**
+ * Helper function for {@link #coalesceIncrements} to increment a counter
+ * value in the passed data structure.
+ * @param counters Nested data structure containing the counters.
+ * @param row Row key to increment.
+ * @param family Column family to increment.
+ * @param qualifier Column qualifier to increment.
+ * @param count Amount to increment by.
+ */
+ private void incrementCounter(
+ Map<byte[], Map<byte[], NavigableMap<byte[], Long>>> counters,
+ byte[] row, byte[] family, byte[] qualifier, Long count) {
+
+ Map<byte[], NavigableMap<byte[], Long>> families = counters.get(row);
+ if (families == null) {
+ families = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ counters.put(row, families);
+ }
+
+ NavigableMap<byte[], Long> qualifiers = families.get(family);
+ if (qualifiers == null) {
+ qualifiers = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ families.put(family, qualifiers);
+ }
+
+ Long existingValue = qualifiers.get(qualifier);
+ if (existingValue == null) {
+ qualifiers.put(qualifier, count);
+ } else {
+ qualifiers.put(qualifier, existingValue + count);
+ }
+ }
+
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ HbaseEventSerializer getSerializer() {
+ return serializer;
+ }
+
+ @VisibleForTesting
+ @InterfaceAudience.Private
+ interface DebugIncrementsCallback {
+ public void onAfterCoalesce(Iterable<Increment> increments);
+ }
}
http://git-wip-us.apache.org/repos/asf/flume/blob/674f4fcc/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java
new file mode 100644
index 0000000..b4343eb
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hbase;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Row;
+
+import java.util.List;
+
+/**
+ * For Increment-related unit tests.
+ */
+class IncrementHBaseSerializer implements HbaseEventSerializer, BatchAware {
+ private Event event;
+ private byte[] family;
+ private int numBatchesStarted = 0;
+
+ @Override public void configure(Context context) { }
+ @Override public void configure(ComponentConfiguration conf) { }
+ @Override public void close() { }
+
+ @Override
+ public void initialize(Event event, byte[] columnFamily) {
+ this.event = event;
+ this.family = columnFamily;
+ }
+
+ // This class only creates Increments.
+ @Override
+ public List<Row> getActions() {
+ return Collections.emptyList();
+ }
+
+ // Treat each Event as a String, i,e, "row:qualifier".
+ @Override
+ public List<Increment> getIncrements() {
+ List<Increment> increments = Lists.newArrayList();
+ String body = new String(event.getBody(), Charsets.UTF_8);
+ String[] pieces = body.split(":");
+ String row = pieces[0];
+ String qualifier = pieces[1];
+ Increment inc = new Increment(row.getBytes(Charsets.UTF_8));
+ inc.addColumn(family, qualifier.getBytes(Charsets.UTF_8), 1L);
+ increments.add(inc);
+ return increments;
+ }
+
+ @Override
+ public void onBatchStart() {
+ numBatchesStarted++;
+ }
+
+ @VisibleForTesting
+ public int getNumBatchesStarted() {
+ return numBatchesStarted;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/674f4fcc/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
index d1b0182..5b047dc 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
@@ -22,9 +22,13 @@ import static org.mockito.Mockito.*;
import java.io.IOException;
import java.util.Arrays;
-import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-
+import java.util.NavigableMap;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Longs;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
@@ -37,64 +41,95 @@ import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.Assert;
-
-import com.google.common.primitives.Longs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestHBaseSink {
- private static HBaseTestingUtility testUtility = new HBaseTestingUtility();
- private static String tableName = "TestHbaseSink";
- private static String columnFamily = "TestColumnFamily";
- private static String inColumn = "iCol";
- private static String plCol = "pCol";
- private static Context ctx = new Context();
- private static String valBase = "testing hbase sink: jham";
- private static Configuration conf;
+ private static final Logger logger =
+ LoggerFactory.getLogger(TestHBaseSink.class);
+
+ private static final HBaseTestingUtility testUtility = new HBaseTestingUtility();
+ private static final String tableName = "TestHbaseSink";
+ private static final String columnFamily = "TestColumnFamily";
+ private static final String inColumn = "iCol";
+ private static final String plCol = "pCol";
+ private static final String valBase = "testing hbase sink: jham";
+
+ private Configuration conf;
+ private Context ctx;
@BeforeClass
- public static void setUp() throws Exception {
+ public static void setUpOnce() throws Exception {
testUtility.startMiniCluster();
- Map<String, String> ctxMap = new HashMap<String, String>();
- ctxMap.put("table", tableName);
- ctxMap.put("columnFamily", columnFamily);
- ctxMap.put("serializer",
- "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer");
- ctxMap.put("serializer.payloadColumn", plCol);
- ctxMap.put("serializer.incrementColumn", inColumn);
- ctx.putAll(ctxMap);
- conf = new Configuration(testUtility.getConfiguration());
}
@AfterClass
- public static void tearDown() throws Exception {
+ public static void tearDownOnce() throws Exception {
testUtility.shutdownMiniCluster();
}
+ /**
+ * Most common context setup for unit tests using
+ * {@link SimpleHbaseEventSerializer}.
+ */
+ @Before
+ public void setUp() throws IOException {
+ conf = new Configuration(testUtility.getConfiguration());
+ ctx = new Context();
+ testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+ }
+ @After
+ public void tearDown() throws IOException {
+ testUtility.deleteTable(tableName.getBytes());
+ }
+
+ /**
+ * Set up {@link Context} for use with {@link SimpleHbaseEventSerializer}.
+ */
+ private void initContextForSimpleHbaseEventSerializer() {
+ ctx = new Context();
+ ctx.put("table", tableName);
+ ctx.put("columnFamily", columnFamily);
+ ctx.put("serializer", SimpleHbaseEventSerializer.class.getName());
+ ctx.put("serializer.payloadColumn", plCol);
+ ctx.put("serializer.incrementColumn", inColumn);
+ }
+
+ /**
+ * Set up {@link Context} for use with {@link IncrementHBaseSerializer}.
+ */
+ private void initContextForIncrementHBaseSerializer() {
+ ctx = new Context();
+ ctx.put("table", tableName);
+ ctx.put("columnFamily", columnFamily);
+ ctx.put("serializer", IncrementHBaseSerializer.class.getName());
+ }
@Test
public void testOneEventWithDefaults() throws Exception {
//Create a context without setting increment column and payload Column
- Map<String,String> ctxMap = new HashMap<String,String>();
- ctxMap.put("table", tableName);
- ctxMap.put("columnFamily", columnFamily);
- ctxMap.put("serializer",
- "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer");
- Context tmpctx = new Context();
- tmpctx.putAll(ctxMap);
+ ctx = new Context();
+ ctx.put("table", tableName);
+ ctx.put("columnFamily", columnFamily);
+ ctx.put("serializer", SimpleHbaseEventSerializer.class.getName());
- testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
HBaseSink sink = new HBaseSink(conf);
Configurables.configure(sink, ctx);
Channel channel = new MemoryChannel();
@@ -117,12 +152,11 @@ public class TestHBaseSink {
Assert.assertArrayEquals(e.getBody(), out);
out = results[1];
Assert.assertArrayEquals(Longs.toByteArray(1), out);
- testUtility.deleteTable(tableName.getBytes());
}
@Test
public void testOneEvent() throws Exception {
- testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+ initContextForSimpleHbaseEventSerializer();
HBaseSink sink = new HBaseSink(conf);
Configurables.configure(sink, ctx);
Channel channel = new MemoryChannel();
@@ -145,12 +179,11 @@ public class TestHBaseSink {
Assert.assertArrayEquals(e.getBody(), out);
out = results[1];
Assert.assertArrayEquals(Longs.toByteArray(1), out);
- testUtility.deleteTable(tableName.getBytes());
}
@Test
public void testThreeEvents() throws Exception {
- testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+ initContextForSimpleHbaseEventSerializer();
ctx.put("batchSize", "3");
HBaseSink sink = new HBaseSink(conf);
Configurables.configure(sink, ctx);
@@ -183,12 +216,11 @@ public class TestHBaseSink {
Assert.assertEquals(3, found);
out = results[3];
Assert.assertArrayEquals(Longs.toByteArray(3), out);
- testUtility.deleteTable(tableName.getBytes());
}
@Test
public void testMultipleBatches() throws Exception {
- testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+ initContextForSimpleHbaseEventSerializer();
ctx.put("batchSize", "2");
HBaseSink sink = new HBaseSink(conf);
Configurables.configure(sink, ctx);
@@ -227,11 +259,17 @@ public class TestHBaseSink {
Assert.assertEquals(3, found);
out = results[3];
Assert.assertArrayEquals(Longs.toByteArray(3), out);
- testUtility.deleteTable(tableName.getBytes());
}
@Test(expected = FlumeException.class)
public void testMissingTable() throws Exception {
+ logger.info("Running testMissingTable()");
+ initContextForSimpleHbaseEventSerializer();
+
+ // setUp() will create the table, so we delete it.
+ logger.info("Deleting table {}", tableName);
+ testUtility.deleteTable(tableName.getBytes());
+
ctx.put("batchSize", "2");
HBaseSink sink = new HBaseSink(conf);
Configurables.configure(sink, ctx);
@@ -240,7 +278,8 @@ public class TestHBaseSink {
Channel channel = new MemoryChannel();
Configurables.configure(channel, new Context());
sink.setChannel(channel);
- sink.start();
+
+ logger.info("Writing data into channel");
Transaction tx = channel.getTransaction();
tx.begin();
for(int i = 0; i < 3; i++){
@@ -249,7 +288,25 @@ public class TestHBaseSink {
}
tx.commit();
tx.close();
- sink.process();
+
+ logger.info("Starting sink and processing events");
+ try {
+ logger.info("Calling sink.start()");
+ sink.start(); // This method will throw.
+
+ // We never get here, but we log in case the behavior changes.
+ logger.error("Unexpected error: Calling sink.process()");
+ sink.process();
+ logger.error("Unexpected error: Calling sink.stop()");
+ sink.stop();
+ } finally {
+ // Re-create the table so tearDown() doesn't throw.
+ testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+ }
+
+ // FIXME: The test should never get here, the below code doesn't run.
+ Assert.fail();
+
HTable table = new HTable(conf, tableName);
byte[][] results = getResults(table, 2);
byte[] out;
@@ -266,9 +323,9 @@ public class TestHBaseSink {
out = results[2];
Assert.assertArrayEquals(Longs.toByteArray(2), out);
sink.process();
- sink.stop();
}
+ // TODO: Move this test to a different class and run it stand-alone.
/**
* This test must run last - it shuts down the minicluster :D
* @throws Exception
@@ -280,8 +337,8 @@ public class TestHBaseSink {
"and uncomment this annotation to run this test.")
@Test(expected = EventDeliveryException.class)
public void testHBaseFailure() throws Exception {
+ initContextForSimpleHbaseEventSerializer();
ctx.put("batchSize", "2");
- testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
HBaseSink sink = new HBaseSink(conf);
Configurables.configure(sink, ctx);
//Reset the context to a higher batchSize
@@ -374,8 +431,9 @@ public class TestHBaseSink {
@Test
public void testTransactionStateOnChannelException() throws Exception {
+ initContextForSimpleHbaseEventSerializer();
ctx.put("batchSize", "1");
- testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+
HBaseSink sink = new HBaseSink(conf);
Configurables.configure(sink, ctx);
// Reset the context to a higher batchSize
@@ -405,15 +463,15 @@ public class TestHBaseSink {
Assert.assertArrayEquals(e.getBody(), out);
out = results[1];
Assert.assertArrayEquals(Longs.toByteArray(1), out);
- testUtility.deleteTable(tableName.getBytes());
}
@Test
public void testTransactionStateOnSerializationException() throws Exception {
+ initContextForSimpleHbaseEventSerializer();
ctx.put("batchSize", "1");
ctx.put(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER,
"org.apache.flume.sink.hbase.MockSimpleHbaseEventSerializer");
- testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+
HBaseSink sink = new HBaseSink(conf);
Configurables.configure(sink, ctx);
// Reset the context to a higher batchSize
@@ -444,11 +502,11 @@ public class TestHBaseSink {
Assert.assertArrayEquals(e.getBody(), out);
out = results[1];
Assert.assertArrayEquals(Longs.toByteArray(1), out);
- testUtility.deleteTable(tableName.getBytes());
}
@Test
public void testWithoutConfigurationObject() throws Exception{
+ initContextForSimpleHbaseEventSerializer();
Context tmpContext = new Context(ctx.getParameters());
tmpContext.put("batchSize", "2");
tmpContext.put(HBaseSinkConfigurationConstants.ZK_QUORUM,
@@ -457,7 +515,7 @@ public class TestHBaseSink {
tmpContext.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT));
- testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+
HBaseSink sink = new HBaseSink();
Configurables.configure(sink, tmpContext);
Channel channel = new MemoryChannel();
@@ -492,11 +550,11 @@ public class TestHBaseSink {
Assert.assertEquals(3, found);
out = results[3];
Assert.assertArrayEquals(Longs.toByteArray(3), out);
- testUtility.deleteTable(tableName.getBytes());
}
@Test
public void testZKQuorum() throws Exception{
+ initContextForSimpleHbaseEventSerializer();
Context tmpContext = new Context(ctx.getParameters());
String zkQuorum = "zk1.flume.apache.org:3342, zk2.flume.apache.org:3342, " +
"zk3.flume.apache.org:3342";
@@ -516,6 +574,7 @@ public class TestHBaseSink {
@Test (expected = FlumeException.class)
public void testZKQuorumIncorrectPorts() throws Exception{
+ initContextForSimpleHbaseEventSerializer();
Context tmpContext = new Context(ctx.getParameters());
String zkQuorum = "zk1.flume.apache.org:3345, zk2.flume.apache.org:3342, " +
@@ -529,4 +588,143 @@ public class TestHBaseSink {
Configurables.configure(sink, tmpContext);
Assert.fail();
}
-}
\ No newline at end of file
+
+ @Test
+ public void testCoalesce() throws EventDeliveryException {
+ initContextForIncrementHBaseSerializer();
+ ctx.put("batchSize", "100");
+ ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
+ String.valueOf(true));
+
+ final Map<String, Long> expectedCounts = Maps.newHashMap();
+ expectedCounts.put("r1:c1", 10L);
+ expectedCounts.put("r1:c2", 20L);
+ expectedCounts.put("r2:c1", 7L);
+ expectedCounts.put("r2:c3", 63L);
+ HBaseSink.DebugIncrementsCallback cb = new CoalesceValidator(expectedCounts);
+
+ HBaseSink sink = new HBaseSink(testUtility.getConfiguration(), cb);
+ Configurables.configure(sink, ctx);
+ Channel channel = createAndConfigureMemoryChannel(sink);
+
+ List<Event> events = Lists.newLinkedList();
+ generateEvents(events, expectedCounts);
+ putEvents(channel, events);
+
+ sink.start();
+ sink.process(); // Calls CoalesceValidator instance.
+ sink.stop();
+ }
+
+ @Test(expected = AssertionError.class)
+ public void negativeTestCoalesce() throws EventDeliveryException {
+ initContextForIncrementHBaseSerializer();
+ ctx.put("batchSize", "10");
+
+ final Map<String, Long> expectedCounts = Maps.newHashMap();
+ expectedCounts.put("r1:c1", 10L);
+ HBaseSink.DebugIncrementsCallback cb = new CoalesceValidator(expectedCounts);
+
+ HBaseSink sink = new HBaseSink(testUtility.getConfiguration(), cb);
+ Configurables.configure(sink, ctx);
+ Channel channel = createAndConfigureMemoryChannel(sink);
+
+ List<Event> events = Lists.newLinkedList();
+ generateEvents(events, expectedCounts);
+ putEvents(channel, events);
+
+ sink.start();
+ sink.process(); // Calls CoalesceValidator instance.
+ sink.stop();
+ }
+
+ @Test
+ public void testBatchAware() throws EventDeliveryException {
+ logger.info("Running testBatchAware()");
+ initContextForIncrementHBaseSerializer();
+ HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
+ Configurables.configure(sink, ctx);
+ Channel channel = createAndConfigureMemoryChannel(sink);
+
+ sink.start();
+ int batchCount = 3;
+ for (int i = 0; i < batchCount; i++) {
+ sink.process();
+ }
+ sink.stop();
+ Assert.assertEquals(batchCount,
+ ((IncrementHBaseSerializer) sink.getSerializer()).getNumBatchesStarted());
+ }
+
+ /**
+ * For testing that the rows coalesced, serialized by
+ * {@link IncrementHBaseSerializer}, are of the expected batch size.
+ */
+ private static class CoalesceValidator
+ implements HBaseSink.DebugIncrementsCallback {
+
+ private final Map<String,Long> expectedCounts;
+
+ public CoalesceValidator(Map<String, Long> expectedCounts) {
+ this.expectedCounts = expectedCounts;
+ }
+
+ @Override
+ public void onAfterCoalesce(Iterable<Increment> increments) {
+ for (Increment inc : increments) {
+ byte[] row = inc.getRow();
+ Map<byte[], NavigableMap<byte[], Long>> families = inc.getFamilyMap();
+ for (byte[] family : families.keySet()) {
+ NavigableMap<byte[], Long> qualifiers = families.get(family);
+ for (Map.Entry<byte[], Long> entry : qualifiers.entrySet()) {
+ byte[] qualifier = entry.getKey();
+ Long count = entry.getValue();
+ StringBuilder b = new StringBuilder(20);
+ b.append(new String(row, Charsets.UTF_8));
+ b.append(':');
+ b.append(new String(qualifier, Charsets.UTF_8));
+ String key = b.toString();
+ Assert.assertEquals("Expected counts don't match observed for " + key,
+ expectedCounts.get(key), count);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Add number of Events corresponding to counts to the events list.
+ * @param events Destination list.
+ * @param counts How many events to generate for each row:qualifier pair.
+ */
+ private void generateEvents(List<Event> events, Map<String, Long> counts) {
+ for (String key : counts.keySet()) {
+ long count = counts.get(key);
+ for (long i = 0; i < count; i++) {
+ events.add(EventBuilder.withBody(key, Charsets.UTF_8));
+ }
+ }
+ }
+
+ private Channel createAndConfigureMemoryChannel(HBaseSink sink) {
+ Channel channel = new MemoryChannel();
+ Context channelCtx = new Context();
+ channelCtx.put("capacity", String.valueOf(1000L));
+ channelCtx.put("transactionCapacity", String.valueOf(1000L));
+ Configurables.configure(channel, channelCtx);
+ sink.setChannel(channel);
+ channel.start();
+ return channel;
+ }
+
+ private void putEvents(Channel channel, Iterable<Event> events) {
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ for (Event event : events) {
+ channel.put(event);
+ }
+ tx.commit();
+ tx.close();
+ }
+
+}