You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/03/03 08:50:20 UTC

git commit: FLUME-2338. Support coalescing increments in HBaseSink.

Repository: flume
Updated Branches:
  refs/heads/trunk a6a6c4c2a -> 674f4fcce


FLUME-2338. Support coalescing increments in HBaseSink.

(Mike Percy via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/674f4fcc
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/674f4fcc
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/674f4fcc

Branch: refs/heads/trunk
Commit: 674f4fcce2597e7e934ccc69eb04b426f5a9b8bb
Parents: a6a6c4c
Author: Hari Shreedharan <hs...@apache.org>
Authored: Sun Mar 2 23:49:17 2014 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Sun Mar 2 23:49:17 2014 -0800

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |   2 +
 .../org/apache/flume/sink/hbase/BatchAware.java |  28 ++
 .../org/apache/flume/sink/hbase/HBaseSink.java  | 195 +++++++++++-
 .../sink/hbase/IncrementHBaseSerializer.java    |  80 +++++
 .../apache/flume/sink/hbase/TestHBaseSink.java  | 298 +++++++++++++++----
 5 files changed, 550 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/674f4fcc/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 96bf73e..cedb283 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1839,6 +1839,8 @@ Property Name       Default                                                 Desc
 zookeeperQuorum     --                                                      The quorum spec. This is the value for the property ``hbase.zookeeper.quorum`` in hbase-site.xml
 znodeParent         /hbase                                                  The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml
 batchSize           100                                                     Number of events to be written per txn.
+coalesceIncrements  false                                                   Should the sink coalesce multiple increments to a cell per batch. This might give
+                                                                            better performance if there are multiple increments to a limited number of cells.
 serializer          org.apache.flume.sink.hbase.SimpleHbaseEventSerializer  Default increment column = "iCol", payload column = "pCol".
 serializer.*        --                                                      Properties to be passed to the serializer.
 kerberosPrincipal   --                                                      Kerberos user principal for accessing secure HBase

http://git-wip-us.apache.org/repos/asf/flume/blob/674f4fcc/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java
new file mode 100644
index 0000000..0974241
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/BatchAware.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hbase;
+
+/**
+ * This interface allows for implementing HBase serializers that are aware of
+ * batching. {@link #onBatchStart()} is called at the beginning of each batch
+ * by the sink.
+ */
+public interface BatchAware {
+  public void onBatchStart();
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/674f4fcc/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
index c4a666c..0390ff8 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSink.java
@@ -19,15 +19,23 @@
 package org.apache.flume.sink.hbase;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.Transaction;
+import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
@@ -52,7 +60,7 @@ import org.apache.hadoop.hbase.security.User;
 /**
  *
  * A simple sink which reads events from a channel and writes them to HBase.
- * The Hbase configution is picked up from the first <tt>hbase-site.xml</tt>
+ * The Hbase configuration is picked up from the first <tt>hbase-site.xml</tt>
  * encountered in the classpath. This sink supports batch reading of
  * events from the channel, and writing them to Hbase, to minimize the number
  * of flushes on the hbase tables. To use this sink, it has to be configured
@@ -97,8 +105,13 @@ public class HBaseSink extends AbstractSink implements Configurable {
   private String kerberosKeytab;
   private User hbaseUser;
   private boolean enableWal = true;
+  private boolean batchIncrements = false;
+  private Method refGetFamilyMap;
   private SinkCounter sinkCounter;
 
+  // Internal hooks used for unit testing.
+  private DebugIncrementsCallback debugIncrCallback = null;
+
   public HBaseSink(){
     this(HBaseConfiguration.create());
   }
@@ -107,6 +120,13 @@ public class HBaseSink extends AbstractSink implements Configurable {
     this.config = conf;
   }
 
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  HBaseSink(Configuration conf, DebugIncrementsCallback cb) {
+    this(conf);
+    this.debugIncrCallback = cb;
+  }
+
   @Override
   public void start(){
     Preconditions.checkArgument(table == null, "Please call stop " +
@@ -222,6 +242,17 @@ public class HBaseSink extends AbstractSink implements Configurable {
         "writes to HBase will have WAL disabled, and any data in the " +
         "memstore of this region in the Region Server could be lost!");
     }
+
+    batchIncrements = context.getBoolean(
+      HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
+      HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS);
+
+    if (batchIncrements) {
+      logger.info("Increment coalescing is enabled. Increments will be " +
+        "buffered.");
+      reflectLookupGetFamilyMap();
+    }
+
     String zkQuorum = context.getString(HBaseSinkConfigurationConstants
       .ZK_QUORUM);
     Integer port = null;
@@ -281,6 +312,11 @@ public class HBaseSink extends AbstractSink implements Configurable {
     List<Increment> incs = new LinkedList<Increment>();
     try {
       txn.begin();
+
+      if (serializer instanceof BatchAware) {
+        ((BatchAware)serializer).onBatchStart();
+      }
+
       long i = 0;
       for (; i < batchSize; i++) {
         Event event = channel.take();
@@ -309,7 +345,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
       try{
         txn.rollback();
       } catch (Exception e2) {
-        logger.error("Exception in rollback. Rollback might not have been" +
+        logger.error("Exception in rollback. Rollback might not have been " +
             "successful." , e2);
       }
       logger.error("Failed to commit transaction." +
@@ -353,7 +389,20 @@ public class HBaseSink extends AbstractSink implements Configurable {
     runPrivileged(new PrivilegedExceptionAction<Void>() {
       @Override
       public Void run() throws Exception {
-        for (final Increment i : incs) {
+
+        List<Increment> processedIncrements;
+        if (batchIncrements) {
+          processedIncrements = coalesceIncrements(incs);
+        } else {
+          processedIncrements = incs;
+        }
+
+        // Only used for unit testing.
+        if (debugIncrCallback != null) {
+          debugIncrCallback.onAfterCoalesce(processedIncrements);
+        }
+
+        for (final Increment i : processedIncrements) {
           i.setWriteToWAL(enableWal);
           table.increment(i);
         }
@@ -364,6 +413,7 @@ public class HBaseSink extends AbstractSink implements Configurable {
     txn.commit();
     sinkCounter.addToEventDrainSuccessCount(actions.size());
   }
+
   private <T> T runPrivileged(final PrivilegedExceptionAction<T> action)
           throws Exception {
     if(hbaseUser != null) {
@@ -375,4 +425,143 @@ public class HBaseSink extends AbstractSink implements Configurable {
       return action.run();
     }
   }
+
+  /**
+   * The method getFamilyMap() is no longer available in Hbase 0.96.
+   * We must use reflection to determine which version we may use.
+   */
+  private void reflectLookupGetFamilyMap() {
+    refGetFamilyMap = null;
+    String[] methodNames = { "getFamilyMap", "getFamilyMapOfLongs" };
+    for (String methodName : methodNames) {
+      try {
+        refGetFamilyMap = Increment.class.getMethod(methodName);
+        if (refGetFamilyMap != null) {
+          logger.debug("Using Increment.{} for coalesce", methodName);
+          break;
+        }
+      } catch (NoSuchMethodException e) {
+        logger.debug("Increment.{} does not exist. Exception follows.",
+            methodName, e);
+      } catch (SecurityException e) {
+        logger.debug("No access to Increment.{}; Exception follows.",
+            methodName, e);
+      }
+    }
+    if (refGetFamilyMap == null) {
+      throw new UnsupportedOperationException(
+          "Cannot find Increment.getFamilyMap()");
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private Map<byte[], NavigableMap<byte[], Long>> getFamilyMap(Increment inc) {
+    Preconditions.checkNotNull(refGetFamilyMap,
+                               "Increment.getFamilymap() not found");
+    Preconditions.checkNotNull(inc, "Increment required");
+    Map<byte[], NavigableMap<byte[], Long>> familyMap = null;
+    try {
+      Object familyObj = refGetFamilyMap.invoke(inc);
+      familyMap = (Map<byte[], NavigableMap<byte[], Long>>) familyObj;
+    } catch (IllegalAccessException e) {
+      logger.warn("Unexpected error calling getFamilyMap()", e);
+      Throwables.propagate(e);
+    } catch (InvocationTargetException e) {
+      logger.warn("Unexpected error calling getFamilyMap()", e);
+      Throwables.propagate(e);
+    }
+    return familyMap;
+  }
+
+  /**
+   * Perform "compression" on the given set of increments so that Flume sends
+   * the minimum possible number of RPC operations to HBase per batch.
+   * @param incs Input: Increment objects to coalesce.
+   * @return List of new Increment objects after coalescing the unique counts.
+   */
+  private List<Increment> coalesceIncrements(Iterable<Increment> incs) {
+    Preconditions.checkNotNull(incs, "List of Increments must not be null");
+    // Aggregate all of the increment row/family/column counts.
+    // The nested map is keyed like this: {row, family, qualifier} => count.
+    Map<byte[], Map<byte[], NavigableMap<byte[], Long>>> counters =
+        Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+    for (Increment inc : incs) {
+      byte[] row = inc.getRow();
+      Map<byte[], NavigableMap<byte[], Long>> families = getFamilyMap(inc);
+      for (Map.Entry<byte[], NavigableMap<byte[],Long>> familyEntry : families.entrySet()) {
+        byte[] family = familyEntry.getKey();
+        NavigableMap<byte[], Long> qualifiers = familyEntry.getValue();
+        for (Map.Entry<byte[], Long> qualifierEntry : qualifiers.entrySet()) {
+          byte[] qualifier = qualifierEntry.getKey();
+          Long count = qualifierEntry.getValue();
+          incrementCounter(counters, row, family, qualifier, count);
+        }
+      }
+    }
+
+    // Reconstruct list of Increments per unique row/family/qualifier.
+    List<Increment> coalesced = Lists.newLinkedList();
+    for (Map.Entry<byte[], Map<byte[],NavigableMap<byte[], Long>>> rowEntry : counters.entrySet()) {
+      byte[] row = rowEntry.getKey();
+      Map <byte[], NavigableMap<byte[], Long>> families = rowEntry.getValue();
+      Increment inc = new Increment(row);
+      for (Map.Entry<byte[], NavigableMap<byte[], Long>> familyEntry : families.entrySet()) {
+        byte[] family = familyEntry.getKey();
+        NavigableMap<byte[], Long> qualifiers = familyEntry.getValue();
+        for (Map.Entry<byte[], Long> qualifierEntry : qualifiers.entrySet()) {
+          byte[] qualifier = qualifierEntry.getKey();
+          long count = qualifierEntry.getValue();
+          inc.addColumn(family, qualifier, count);
+        }
+      }
+      coalesced.add(inc);
+    }
+
+    return coalesced;
+  }
+
+  /**
+   * Helper function for {@link #coalesceIncrements} to increment a counter
+   * value in the passed data structure.
+   * @param counters Nested data structure containing the counters.
+   * @param row Row key to increment.
+   * @param family Column family to increment.
+   * @param qualifier Column qualifier to increment.
+   * @param count Amount to increment by.
+   */
+  private void incrementCounter(
+      Map<byte[], Map<byte[], NavigableMap<byte[], Long>>> counters,
+      byte[] row, byte[] family, byte[] qualifier, Long count) {
+
+    Map<byte[], NavigableMap<byte[], Long>> families = counters.get(row);
+    if (families == null) {
+      families = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+      counters.put(row, families);
+    }
+
+    NavigableMap<byte[], Long> qualifiers = families.get(family);
+    if (qualifiers == null) {
+      qualifiers = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+      families.put(family, qualifiers);
+    }
+
+    Long existingValue = qualifiers.get(qualifier);
+    if (existingValue == null) {
+      qualifiers.put(qualifier, count);
+    } else {
+      qualifiers.put(qualifier, existingValue + count);
+    }
+  }
+
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  HbaseEventSerializer getSerializer() {
+    return serializer;
+  }
+
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  interface DebugIncrementsCallback {
+    public void onAfterCoalesce(Iterable<Increment> increments);
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/674f4fcc/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java
new file mode 100644
index 0000000..b4343eb
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementHBaseSerializer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hbase;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Row;
+
+import java.util.List;
+
+/**
+ * For Increment-related unit tests.
+ */
+class IncrementHBaseSerializer implements HbaseEventSerializer, BatchAware {
+  private Event event;
+  private byte[] family;
+  private int numBatchesStarted = 0;
+
+  @Override public void configure(Context context) { }
+  @Override public void configure(ComponentConfiguration conf) { }
+  @Override public void close() { }
+
+  @Override
+  public void initialize(Event event, byte[] columnFamily) {
+    this.event = event;
+    this.family = columnFamily;
+  }
+
+  // This class only creates Increments.
+  @Override
+  public List<Row> getActions() {
+    return Collections.emptyList();
+  }
+
+  // Treat each Event as a String, i,e, "row:qualifier".
+  @Override
+  public List<Increment> getIncrements() {
+    List<Increment> increments = Lists.newArrayList();
+    String body = new String(event.getBody(), Charsets.UTF_8);
+    String[] pieces = body.split(":");
+    String row = pieces[0];
+    String qualifier = pieces[1];
+    Increment inc = new Increment(row.getBytes(Charsets.UTF_8));
+    inc.addColumn(family, qualifier.getBytes(Charsets.UTF_8), 1L);
+    increments.add(inc);
+    return increments;
+  }
+
+  @Override
+  public void onBatchStart() {
+    numBatchesStarted++;
+  }
+
+  @VisibleForTesting
+  public int getNumBatchesStarted() {
+    return numBatchesStarted;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/674f4fcc/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
index d1b0182..5b047dc 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
@@ -22,9 +22,13 @@ import static org.mockito.Mockito.*;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-
+import java.util.NavigableMap;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Longs;
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
@@ -37,64 +41,95 @@ import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.Assert;
-
-import com.google.common.primitives.Longs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestHBaseSink {
-  private static HBaseTestingUtility testUtility = new HBaseTestingUtility();
-  private static String tableName = "TestHbaseSink";
-  private static String columnFamily = "TestColumnFamily";
-  private static String inColumn = "iCol";
-  private static String plCol = "pCol";
-  private static Context ctx = new Context();
-  private static String valBase = "testing hbase sink: jham";
-  private static Configuration conf;
+  private static final Logger logger =
+      LoggerFactory.getLogger(TestHBaseSink.class);
+
+  private static final HBaseTestingUtility testUtility = new HBaseTestingUtility();
+  private static final String tableName = "TestHbaseSink";
+  private static final String columnFamily = "TestColumnFamily";
+  private static final String inColumn = "iCol";
+  private static final String plCol = "pCol";
+  private static final String valBase = "testing hbase sink: jham";
+
+  private Configuration conf;
+  private Context ctx;
 
   @BeforeClass
-  public static void setUp() throws Exception {
+  public static void setUpOnce() throws Exception {
     testUtility.startMiniCluster();
-    Map<String, String> ctxMap = new HashMap<String, String>();
-    ctxMap.put("table", tableName);
-    ctxMap.put("columnFamily", columnFamily);
-    ctxMap.put("serializer",
-        "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer");
-    ctxMap.put("serializer.payloadColumn", plCol);
-    ctxMap.put("serializer.incrementColumn", inColumn);
-    ctx.putAll(ctxMap);
-    conf = new Configuration(testUtility.getConfiguration());
   }
 
   @AfterClass
-  public static void tearDown() throws Exception {
+  public static void tearDownOnce() throws Exception {
     testUtility.shutdownMiniCluster();
   }
 
+  /**
+   * Most common context setup for unit tests using
+   * {@link SimpleHbaseEventSerializer}.
+   */
+  @Before
+  public void setUp() throws IOException {
+    conf = new Configuration(testUtility.getConfiguration());
+    ctx = new Context();
+    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+  }
 
+  @After
+  public void tearDown() throws IOException {
+    testUtility.deleteTable(tableName.getBytes());
+  }
+
+  /**
+   * Set up {@link Context} for use with {@link SimpleHbaseEventSerializer}.
+   */
+  private void initContextForSimpleHbaseEventSerializer() {
+    ctx = new Context();
+    ctx.put("table", tableName);
+    ctx.put("columnFamily", columnFamily);
+    ctx.put("serializer", SimpleHbaseEventSerializer.class.getName());
+    ctx.put("serializer.payloadColumn", plCol);
+    ctx.put("serializer.incrementColumn", inColumn);
+  }
+
+  /**
+   * Set up {@link Context} for use with {@link IncrementHBaseSerializer}.
+   */
+  private void initContextForIncrementHBaseSerializer() {
+    ctx = new Context();
+    ctx.put("table", tableName);
+    ctx.put("columnFamily", columnFamily);
+    ctx.put("serializer", IncrementHBaseSerializer.class.getName());
+  }
 
   @Test
   public void testOneEventWithDefaults() throws Exception {
     //Create a context without setting increment column and payload Column
-    Map<String,String> ctxMap = new HashMap<String,String>();
-    ctxMap.put("table", tableName);
-    ctxMap.put("columnFamily", columnFamily);
-    ctxMap.put("serializer",
-            "org.apache.flume.sink.hbase.SimpleHbaseEventSerializer");
-    Context tmpctx = new Context();
-    tmpctx.putAll(ctxMap);
+    ctx = new Context();
+    ctx.put("table", tableName);
+    ctx.put("columnFamily", columnFamily);
+    ctx.put("serializer", SimpleHbaseEventSerializer.class.getName());
 
-    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
     Channel channel = new MemoryChannel();
@@ -117,12 +152,11 @@ public class TestHBaseSink {
     Assert.assertArrayEquals(e.getBody(), out);
     out = results[1];
     Assert.assertArrayEquals(Longs.toByteArray(1), out);
-    testUtility.deleteTable(tableName.getBytes());
   }
 
   @Test
   public void testOneEvent() throws Exception {
-    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    initContextForSimpleHbaseEventSerializer();
     HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
     Channel channel = new MemoryChannel();
@@ -145,12 +179,11 @@ public class TestHBaseSink {
     Assert.assertArrayEquals(e.getBody(), out);
     out = results[1];
     Assert.assertArrayEquals(Longs.toByteArray(1), out);
-    testUtility.deleteTable(tableName.getBytes());
   }
 
   @Test
   public void testThreeEvents() throws Exception {
-    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    initContextForSimpleHbaseEventSerializer();
     ctx.put("batchSize", "3");
     HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
@@ -183,12 +216,11 @@ public class TestHBaseSink {
     Assert.assertEquals(3, found);
     out = results[3];
     Assert.assertArrayEquals(Longs.toByteArray(3), out);
-    testUtility.deleteTable(tableName.getBytes());
   }
 
   @Test
   public void testMultipleBatches() throws Exception {
-    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    initContextForSimpleHbaseEventSerializer();
     ctx.put("batchSize", "2");
     HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
@@ -227,11 +259,17 @@ public class TestHBaseSink {
     Assert.assertEquals(3, found);
     out = results[3];
     Assert.assertArrayEquals(Longs.toByteArray(3), out);
-    testUtility.deleteTable(tableName.getBytes());
   }
 
   @Test(expected = FlumeException.class)
   public void testMissingTable() throws Exception {
+    logger.info("Running testMissingTable()");
+    initContextForSimpleHbaseEventSerializer();
+
+    // setUp() will create the table, so we delete it.
+    logger.info("Deleting table {}", tableName);
+    testUtility.deleteTable(tableName.getBytes());
+
     ctx.put("batchSize", "2");
     HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
@@ -240,7 +278,8 @@ public class TestHBaseSink {
     Channel channel = new MemoryChannel();
     Configurables.configure(channel, new Context());
     sink.setChannel(channel);
-    sink.start();
+
+    logger.info("Writing data into channel");
     Transaction tx = channel.getTransaction();
     tx.begin();
     for(int i = 0; i < 3; i++){
@@ -249,7 +288,25 @@ public class TestHBaseSink {
     }
     tx.commit();
     tx.close();
-    sink.process();
+
+    logger.info("Starting sink and processing events");
+    try {
+      logger.info("Calling sink.start()");
+      sink.start(); // This method will throw.
+
+      // We never get here, but we log in case the behavior changes.
+      logger.error("Unexpected error: Calling sink.process()");
+      sink.process();
+      logger.error("Unexpected error: Calling sink.stop()");
+      sink.stop();
+    } finally {
+      // Re-create the table so tearDown() doesn't throw.
+      testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+    }
+
+    // FIXME: The test should never get here, the below code doesn't run.
+    Assert.fail();
+
     HTable table = new HTable(conf, tableName);
     byte[][] results = getResults(table, 2);
     byte[] out;
@@ -266,9 +323,9 @@ public class TestHBaseSink {
     out = results[2];
     Assert.assertArrayEquals(Longs.toByteArray(2), out);
     sink.process();
-    sink.stop();
   }
 
+  // TODO: Move this test to a different class and run it stand-alone.
   /**
    * This test must run last - it shuts down the minicluster :D
    * @throws Exception
@@ -280,8 +337,8 @@ public class TestHBaseSink {
       "and uncomment this annotation to run this test.")
   @Test(expected = EventDeliveryException.class)
   public void testHBaseFailure() throws Exception {
+    initContextForSimpleHbaseEventSerializer();
     ctx.put("batchSize", "2");
-    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
     //Reset the context to a higher batchSize
@@ -374,8 +431,9 @@ public class TestHBaseSink {
 
   @Test
   public void testTransactionStateOnChannelException() throws Exception {
+    initContextForSimpleHbaseEventSerializer();
     ctx.put("batchSize", "1");
-    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+
     HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
     // Reset the context to a higher batchSize
@@ -405,15 +463,15 @@ public class TestHBaseSink {
     Assert.assertArrayEquals(e.getBody(), out);
     out = results[1];
     Assert.assertArrayEquals(Longs.toByteArray(1), out);
-    testUtility.deleteTable(tableName.getBytes());
   }
 
   @Test
   public void testTransactionStateOnSerializationException() throws Exception {
+    initContextForSimpleHbaseEventSerializer();
     ctx.put("batchSize", "1");
     ctx.put(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER,
         "org.apache.flume.sink.hbase.MockSimpleHbaseEventSerializer");
-    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+
     HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
     // Reset the context to a higher batchSize
@@ -444,11 +502,11 @@ public class TestHBaseSink {
     Assert.assertArrayEquals(e.getBody(), out);
     out = results[1];
     Assert.assertArrayEquals(Longs.toByteArray(1), out);
-    testUtility.deleteTable(tableName.getBytes());
   }
 
   @Test
   public void testWithoutConfigurationObject() throws Exception{
+    initContextForSimpleHbaseEventSerializer();
     Context tmpContext = new Context(ctx.getParameters());
     tmpContext.put("batchSize", "2");
     tmpContext.put(HBaseSinkConfigurationConstants.ZK_QUORUM,
@@ -457,7 +515,7 @@ public class TestHBaseSink {
     tmpContext.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
       conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
         HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT));
-    testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
+
     HBaseSink sink = new HBaseSink();
     Configurables.configure(sink, tmpContext);
     Channel channel = new MemoryChannel();
@@ -492,11 +550,11 @@ public class TestHBaseSink {
     Assert.assertEquals(3, found);
     out = results[3];
     Assert.assertArrayEquals(Longs.toByteArray(3), out);
-    testUtility.deleteTable(tableName.getBytes());
   }
 
   @Test
   public void testZKQuorum() throws Exception{
+    initContextForSimpleHbaseEventSerializer();
     Context tmpContext = new Context(ctx.getParameters());
     String zkQuorum = "zk1.flume.apache.org:3342, zk2.flume.apache.org:3342, " +
       "zk3.flume.apache.org:3342";
@@ -516,6 +574,7 @@ public class TestHBaseSink {
 
   @Test (expected = FlumeException.class)
   public void testZKQuorumIncorrectPorts() throws Exception{
+    initContextForSimpleHbaseEventSerializer();
     Context tmpContext = new Context(ctx.getParameters());
 
     String zkQuorum = "zk1.flume.apache.org:3345, zk2.flume.apache.org:3342, " +
@@ -529,4 +588,143 @@ public class TestHBaseSink {
     Configurables.configure(sink, tmpContext);
     Assert.fail();
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testCoalesce() throws EventDeliveryException {
+    initContextForIncrementHBaseSerializer();
+    ctx.put("batchSize", "100");
+    ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
+        String.valueOf(true));
+
+    final Map<String, Long> expectedCounts = Maps.newHashMap();
+    expectedCounts.put("r1:c1", 10L);
+    expectedCounts.put("r1:c2", 20L);
+    expectedCounts.put("r2:c1", 7L);
+    expectedCounts.put("r2:c3", 63L);
+    HBaseSink.DebugIncrementsCallback cb = new CoalesceValidator(expectedCounts);
+
+    HBaseSink sink = new HBaseSink(testUtility.getConfiguration(), cb);
+    Configurables.configure(sink, ctx);
+    Channel channel = createAndConfigureMemoryChannel(sink);
+
+    List<Event> events = Lists.newLinkedList();
+    generateEvents(events, expectedCounts);
+    putEvents(channel, events);
+
+    sink.start();
+    sink.process(); // Calls CoalesceValidator instance.
+    sink.stop();
+  }
+
+  @Test(expected = AssertionError.class)
+  public void negativeTestCoalesce() throws EventDeliveryException {
+    initContextForIncrementHBaseSerializer();
+    ctx.put("batchSize", "10");
+
+    final Map<String, Long> expectedCounts = Maps.newHashMap();
+    expectedCounts.put("r1:c1", 10L);
+    HBaseSink.DebugIncrementsCallback cb = new CoalesceValidator(expectedCounts);
+
+    HBaseSink sink = new HBaseSink(testUtility.getConfiguration(), cb);
+    Configurables.configure(sink, ctx);
+    Channel channel = createAndConfigureMemoryChannel(sink);
+
+    List<Event> events = Lists.newLinkedList();
+    generateEvents(events, expectedCounts);
+    putEvents(channel, events);
+
+    sink.start();
+    sink.process(); // Calls CoalesceValidator instance.
+    sink.stop();
+  }
+
+  @Test
+  public void testBatchAware() throws EventDeliveryException {
+    logger.info("Running testBatchAware()");
+    initContextForIncrementHBaseSerializer();
+    HBaseSink sink = new HBaseSink(testUtility.getConfiguration());
+    Configurables.configure(sink, ctx);
+    Channel channel = createAndConfigureMemoryChannel(sink);
+
+    sink.start();
+    int batchCount = 3;
+    for (int i = 0; i < batchCount; i++) {
+      sink.process();
+    }
+    sink.stop();
+    Assert.assertEquals(batchCount,
+        ((IncrementHBaseSerializer) sink.getSerializer()).getNumBatchesStarted());
+  }
+
+  /**
+   * For testing that the rows coalesced, serialized by
+   * {@link IncrementHBaseSerializer}, are of the expected batch size.
+   */
+  private static class CoalesceValidator
+      implements HBaseSink.DebugIncrementsCallback {
+
+    private final Map<String,Long> expectedCounts;
+
+    public CoalesceValidator(Map<String, Long> expectedCounts) {
+      this.expectedCounts = expectedCounts;
+    }
+
+    @Override
+    public void onAfterCoalesce(Iterable<Increment> increments) {
+      for (Increment inc : increments) {
+        byte[] row = inc.getRow();
+        Map<byte[], NavigableMap<byte[], Long>> families = inc.getFamilyMap();
+        for (byte[] family : families.keySet()) {
+          NavigableMap<byte[], Long> qualifiers = families.get(family);
+          for (Map.Entry<byte[], Long> entry : qualifiers.entrySet()) {
+            byte[] qualifier = entry.getKey();
+            Long count = entry.getValue();
+            StringBuilder b = new StringBuilder(20);
+            b.append(new String(row, Charsets.UTF_8));
+            b.append(':');
+            b.append(new String(qualifier, Charsets.UTF_8));
+            String key = b.toString();
+            Assert.assertEquals("Expected counts don't match observed for " + key,
+                expectedCounts.get(key), count);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Add number of Events corresponding to counts to the events list.
+   * @param events Destination list.
+   * @param counts How many events to generate for each row:qualifier pair.
+   */
+  private void generateEvents(List<Event> events, Map<String, Long> counts) {
+    for (String key : counts.keySet()) {
+      long count = counts.get(key);
+      for (long i = 0; i < count; i++) {
+        events.add(EventBuilder.withBody(key, Charsets.UTF_8));
+      }
+    }
+  }
+
+  private Channel createAndConfigureMemoryChannel(HBaseSink sink) {
+    Channel channel = new MemoryChannel();
+    Context channelCtx = new Context();
+    channelCtx.put("capacity", String.valueOf(1000L));
+    channelCtx.put("transactionCapacity", String.valueOf(1000L));
+    Configurables.configure(channel, channelCtx);
+    sink.setChannel(channel);
+    channel.start();
+    return channel;
+  }
+
+  private void putEvents(Channel channel, Iterable<Event> events) {
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for (Event event : events) {
+      channel.put(event);
+    }
+    tx.commit();
+    tx.close();
+  }
+
+}