You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/07/18 13:56:59 UTC

[metron] branch feature/METRON-2088-support-hdp-3.1 updated: METRON-2175 Introduce HBase Connection Abstractions for HBase 2.0.2 (nickwallen) closes apache/metron#1456

This is an automated email from the ASF dual-hosted git repository.

nickallen pushed a commit to branch feature/METRON-2088-support-hdp-3.1
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/feature/METRON-2088-support-hdp-3.1 by this push:
     new ef3a76c  METRON-2175 Introduce HBase Connection Abstractions for HBase 2.0.2 (nickwallen) closes apache/metron#1456
ef3a76c is described below

commit ef3a76c799d48856d8e4999226ec9392f298a2f7
Author: nickwallen <ni...@nickallen.org>
AuthorDate: Thu Jul 18 09:56:31 2019 -0400

    METRON-2175 Introduce HBase Connection Abstractions for HBase 2.0.2 (nickwallen) closes apache/metron#1456
---
 .../metron/profiler/client/ProfileWriter.java      |   6 +-
 .../spark/function/HBaseWriterFunction.java        |   4 +-
 .../org/apache/metron/hbase/bolt/HBaseBolt.java    |   8 +-
 .../apache/metron/hbase/bolt/HBaseBoltTest.java    |   6 +-
 .../org/apache/metron/rest/config/HBaseConfig.java |   6 +-
 .../impl/SensorEnrichmentConfigServiceImpl.java    |   6 +-
 .../org/apache/metron/rest/config/TestConfig.java  |   6 +-
 .../SensorEnrichmentConfigServiceImplTest.java     |   6 +-
 .../metron-elasticsearch-common/pom.xml            |  18 ++
 .../metron/hbase/coprocessor/HBaseCacheWriter.java |   4 +-
 .../java/org/apache/metron/hbase/ColumnList.java   |  97 ++++++-
 .../metron/hbase/HBaseProjectionCriteria.java      |  10 +-
 .../apache/metron/hbase/client/HBaseClient.java    | 317 ++++-----------------
 .../metron/hbase/client/HBaseClientFactory.java    |  70 +++++
 .../hbase/client/HBaseConnectionFactory.java       |  59 ++++
 .../metron/hbase/client/HBaseTableClient.java      | 285 ++++++++++++++++++
 .../hbase/client/HBaseTableClientFactory.java      |  54 ++++
 .../metron/hbase/client/HBaseWriterParams.java     |  51 ++++
 .../{HBaseClient.java => LegacyHBaseClient.java}   |   4 +-
 .../metron/hbase/client/HBaseClientTest.java       |  10 +-
 .../HBaseTableClientIntegrationTest.java           | 286 +++++++++++++++++++
 metron-platform/metron-pcap/pom.xml                |   5 +
 pom.xml                                            |  26 +-
 23 files changed, 1039 insertions(+), 305 deletions(-)

diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
index 4e00164..d4e5e66 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.metron.hbase.HTableProvider;
 import org.apache.metron.hbase.ColumnList;
-import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.LegacyHBaseClient;
 import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.metron.profiler.ProfilePeriod;
 import org.apache.metron.profiler.hbase.ColumnBuilder;
@@ -47,13 +47,13 @@ public class ProfileWriter {
 
   private RowKeyBuilder rowKeyBuilder;
   private ColumnBuilder columnBuilder;
-  private HBaseClient hbaseClient;
+  private LegacyHBaseClient hbaseClient;
   private HBaseProfilerClient client;
 
   public ProfileWriter(RowKeyBuilder rowKeyBuilder, ColumnBuilder columnBuilder, HTableInterface table, long periodDurationMillis) {
     this.rowKeyBuilder = rowKeyBuilder;
     this.columnBuilder = columnBuilder;
-    this.hbaseClient = new HBaseClient((c, t) -> table, table.getConfiguration(), table.getName().getNameAsString());
+    this.hbaseClient = new LegacyHBaseClient((c, t) -> table, table.getConfiguration(), table.getName().getNameAsString());
     this.client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder, periodDurationMillis);
   }
 
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
index cfabd94..6a090cf 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.metron.hbase.HTableProvider;
 import org.apache.metron.hbase.TableProvider;
-import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.LegacyHBaseClient;
 import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.metron.profiler.hbase.ColumnBuilder;
 import org.apache.metron.profiler.hbase.RowKeyBuilder;
@@ -118,7 +118,7 @@ public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasure
 
       // open an HBase connection
       Configuration config = HBaseConfiguration.create();
-      try (HBaseClient client = new HBaseClient(tableProvider, config, tableName)) {
+      try (LegacyHBaseClient client = new LegacyHBaseClient(tableProvider, config, tableName)) {
 
         for (ProfileMeasurementAdapter adapter : measurements) {
           ProfileMeasurement m = adapter.toProfileMeasurement();
diff --git a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
index ec860a5..07bd552 100644
--- a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
+++ b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
@@ -31,7 +31,7 @@ import org.apache.metron.hbase.HTableProvider;
 import org.apache.metron.hbase.TableProvider;
 import org.apache.metron.hbase.ColumnList;
 import org.apache.metron.hbase.bolt.mapper.HBaseMapper;
-import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.LegacyHBaseClient;
 import org.apache.storm.Config;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -90,7 +90,7 @@ public class HBaseBolt extends BaseRichBolt {
 
   private BatchHelper batchHelper;
   protected OutputCollector collector;
-  protected transient HBaseClient hbaseClient;
+  protected transient LegacyHBaseClient hbaseClient;
 
   public HBaseBolt(String tableName, HBaseMapper mapper) {
     this.tableName = tableName;
@@ -122,7 +122,7 @@ public class HBaseBolt extends BaseRichBolt {
     return this;
   }
 
-  public void setClient(HBaseClient hbaseClient) {
+  public void setClient(LegacyHBaseClient hbaseClient) {
     this.hbaseClient = hbaseClient;
   }
 
@@ -147,7 +147,7 @@ public class HBaseBolt extends BaseRichBolt {
       provider = this.tableProvider;
     }
 
-    hbaseClient = new HBaseClient(provider, HBaseConfiguration.create(), tableName);
+    hbaseClient = new LegacyHBaseClient(provider, HBaseConfiguration.create(), tableName);
   }
 
   @Override
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
index bae3728..9146aff 100644
--- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
@@ -25,7 +25,7 @@ import org.apache.storm.Constants;
 import org.apache.storm.tuple.Tuple;
 import org.apache.metron.hbase.bolt.mapper.Widget;
 import org.apache.metron.hbase.bolt.mapper.WidgetMapper;
-import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.LegacyHBaseClient;
 import org.apache.metron.test.bolt.BaseBoltTest;
 import org.junit.Assert;
 import org.junit.Before;
@@ -48,7 +48,7 @@ import static org.mockito.Mockito.when;
 public class HBaseBoltTest extends BaseBoltTest {
 
   private static final String tableName = "widgets";
-  private HBaseClient client;
+  private LegacyHBaseClient client;
   private Tuple tuple1;
   private Tuple tuple2;
   private Widget widget1;
@@ -71,7 +71,7 @@ public class HBaseBoltTest extends BaseBoltTest {
   public void setup() throws Exception {
     tuple1 = mock(Tuple.class);
     tuple2 = mock(Tuple.class);
-    client = mock(HBaseClient.class);
+    client = mock(LegacyHBaseClient.class);
     provider = mock(TableProvider.class);
   }
 
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/HBaseConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/HBaseConfig.java
index 7ce16f9..c1476e8 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/HBaseConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/HBaseConfig.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.hbase.HTableProvider;
 import org.apache.metron.hbase.TableProvider;
-import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.LegacyHBaseClient;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.service.GlobalConfigService;
 import org.apache.metron.rest.user.UserSettingsClient;
@@ -60,7 +60,7 @@ public class HBaseConfig {
     }
 
     @Bean()
-    public HBaseClient hBaseClient() {
+    public LegacyHBaseClient hBaseClient() {
       Map<String, Object> restConfig = null;
       try {
         restConfig = globalConfigService.get();
@@ -75,7 +75,7 @@ public class HBaseConfig {
       } catch (ClassNotFoundException | InstantiationException | InvocationTargetException | IllegalAccessException | NoSuchMethodException e) {
         throw new IllegalStateException("Unable to create table provider", e);
       }
-      return new HBaseClient(provider, HBaseConfiguration.create(),
+      return new LegacyHBaseClient(provider, HBaseConfiguration.create(),
           (String) restConfig.get(EnrichmentConfigurations.TABLE_NAME));
     }
 
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
index 5c0f2e0..d7f7b7f 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java
@@ -32,7 +32,7 @@ import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
 import org.apache.metron.common.zookeeper.ConfigurationsCache;
-import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.LegacyHBaseClient;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.service.SensorEnrichmentConfigService;
 import org.apache.zookeeper.KeeperException;
@@ -48,12 +48,12 @@ public class SensorEnrichmentConfigServiceImpl implements SensorEnrichmentConfig
 
     private ConfigurationsCache cache;
 
-    private HBaseClient hBaseClient;
+    private LegacyHBaseClient hBaseClient;
 
     @Autowired
     public SensorEnrichmentConfigServiceImpl(final ObjectMapper objectMapper,
         final CuratorFramework client, final ConfigurationsCache cache,
-        final HBaseClient hBaseClient) {
+        final LegacyHBaseClient hBaseClient) {
       this.objectMapper = objectMapper;
       this.client = client;
       this.cache = cache;
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
index d42f128..b3a478b 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
@@ -46,7 +46,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.zookeeper.ConfigurationsCache;
 import org.apache.metron.common.zookeeper.ZKConfigurationsCache;
-import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.LegacyHBaseClient;
 import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.UnableToStartException;
@@ -202,7 +202,7 @@ public class TestConfig {
   }
 
   @Bean()
-  public HBaseClient hBaseClient() throws RestException, IOException {
+  public LegacyHBaseClient hBaseClient() throws RestException, IOException {
     final String cf = "t";
     final String cq = "v";
     HTableInterface table = MockHBaseTableProvider.addToCache("enrichment_list", cf);
@@ -216,7 +216,7 @@ public class TestConfig {
       put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(cq), "{}".getBytes(StandardCharsets.UTF_8));
       table.put(put);
     }
-    return new HBaseClient(new MockHBaseTableProvider(), HBaseConfiguration.create(),
+    return new LegacyHBaseClient(new MockHBaseTableProvider(), HBaseConfiguration.create(),
         "enrichment_list");
   }
 
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java
index 3d07da5..4176121 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java
@@ -42,7 +42,7 @@ import org.apache.metron.common.configuration.enrichment.EnrichmentConfig;
 import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
 import org.apache.metron.common.configuration.enrichment.threatintel.ThreatIntelConfig;
 import org.apache.metron.common.zookeeper.ConfigurationsCache;
-import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.LegacyHBaseClient;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.service.SensorEnrichmentConfigService;
 import org.apache.zookeeper.KeeperException;
@@ -82,14 +82,14 @@ public class SensorEnrichmentConfigServiceImplTest {
   public static String broJson;
 
   ConfigurationsCache cache;
-  private HBaseClient hBaseClient;
+  private LegacyHBaseClient hBaseClient;
 
   @Before
   public void setUp() throws Exception {
     objectMapper = mock(ObjectMapper.class);
     curatorFramework = mock(CuratorFramework.class);
     cache = mock(ConfigurationsCache.class);
-    hBaseClient = mock(HBaseClient.class);
+    hBaseClient = mock(LegacyHBaseClient.class);
     sensorEnrichmentConfigService = new SensorEnrichmentConfigServiceImpl(objectMapper, curatorFramework, cache, hBaseClient);
   }
 
diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml
index 190bcb2..ce61df3 100644
--- a/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml
+++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-common/pom.xml
@@ -59,9 +59,23 @@
                     <groupId>log4j</groupId>
                     <artifactId>log4j</artifactId>
                 </exclusion>
+                <exclusion>
+                    <artifactId>commons-httpclient</artifactId>
+                    <groupId>commons-httpclient</groupId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.4.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore</artifactId>
+            <version>4.4.9</version>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
             <version>${guava_version}</version>
@@ -178,6 +192,10 @@
                     <groupId>io.netty</groupId>
                     <artifactId>netty-all</artifactId>
                 </exclusion>
+                <exclusion>
+                    <artifactId>commons-httpclient</artifactId>
+                    <groupId>commons-httpclient</groupId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
diff --git a/metron-platform/metron-hbase-server/src/main/java/org/apache/metron/hbase/coprocessor/HBaseCacheWriter.java b/metron-platform/metron-hbase-server/src/main/java/org/apache/metron/hbase/coprocessor/HBaseCacheWriter.java
index b1bbdde..5d19960 100644
--- a/metron-platform/metron-hbase-server/src/main/java/org/apache/metron/hbase/coprocessor/HBaseCacheWriter.java
+++ b/metron-platform/metron-hbase-server/src/main/java/org/apache/metron/hbase/coprocessor/HBaseCacheWriter.java
@@ -26,7 +26,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.metron.hbase.TableProvider;
-import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.LegacyHBaseClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,7 +59,7 @@ public class HBaseCacheWriter implements CacheWriter<String, String> {
   @Override
   public void write(@Nonnull String key, @Nonnull String value) {
     LOG.debug("Calling hbase cache writer with key='{}', value='{}'", key, value);
-    try (HBaseClient hbClient = new HBaseClient(this.tableProvider, this.config, this.tableName)) {
+    try (LegacyHBaseClient hbClient = new LegacyHBaseClient(this.tableProvider, this.config, this.tableName)) {
       LOG.debug("rowKey={}, columnFamily={}, columnQualifier={}, value={}", key, columnFamily,
           columnQualifier, value);
       hbClient.put(key, columnFamily, columnQualifier, value);
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/ColumnList.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/ColumnList.java
index 01a5cc6..33af434 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/ColumnList.java
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/ColumnList.java
@@ -20,8 +20,12 @@
 
 package org.apache.metron.hbase;
 
+import org.apache.hadoop.hbase.util.Bytes;
+
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * Represents a list of HBase columns.
@@ -76,6 +80,30 @@ public class ColumnList {
     public long getTs() {
       return ts;
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (!(o instanceof Column)) return false;
+      Column column = (Column) o;
+      return ts == column.ts &&
+              Arrays.equals(value, column.value);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = Objects.hash(ts);
+      result = 31 * result + Arrays.hashCode(value);
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return "Column{" +
+              "value=" + Arrays.toString(value) +
+              ", ts=" + ts +
+              '}';
+    }
   }
 
   public static class Counter extends AbstractColumn {
@@ -88,6 +116,26 @@ public class ColumnList {
     public long getIncrement() {
       return incr;
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (!(o instanceof Counter)) return false;
+      Counter counter = (Counter) o;
+      return incr == counter.incr;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(incr);
+    }
+
+    @Override
+    public String toString() {
+      return "Counter{" +
+              "incr=" + incr +
+              '}';
+    }
   }
 
   private ArrayList<ColumnList.Column> columns;
@@ -123,6 +171,24 @@ public class ColumnList {
     return this;
   }
 
+  public ColumnList addColumn(byte[] family, byte[] qualifier){
+    addColumn(new Column(family, qualifier, -1, null));
+    return this;
+  }
+
+  public ColumnList addColumn(String family, String qualifier){
+    addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
+    return this;
+  }
+
+  public ColumnList addColumn(String family, String qualifier, byte[] value){
+    return addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), value);
+  }
+
+  public ColumnList addColumn(String family, String qualifier, String value){
+    return addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value));
+  }
+
   /**
    * Add a standard HBase column given an instance of a class that implements
    * the <code>IColumn</code> interface.
@@ -131,6 +197,11 @@ public class ColumnList {
     return this.addColumn(column.family(), column.qualifier(), column.timestamp(), column.value());
   }
 
+  public ColumnList addColumn(Column column){
+    columns().add(column);
+    return this;
+  }
+
   /**
    * Add an HBase counter column.
    */
@@ -152,14 +223,14 @@ public class ColumnList {
    * Query to determine if we have column definitions.
    */
   public boolean hasColumns(){
-    return this.columns != null;
+    return columns != null && columns.size() > 0;
   }
 
   /**
    * Query to determine if we have counter definitions.
    */
   public boolean hasCounters(){
-    return this.counters != null;
+    return this.counters != null && counters.size() > 0;
   }
 
   /**
@@ -175,4 +246,26 @@ public class ColumnList {
   public List<Counter> getCounters(){
     return this.counters;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof ColumnList)) return false;
+    ColumnList that = (ColumnList) o;
+    return Objects.equals(columns, that.columns) &&
+            Objects.equals(counters, that.counters);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(columns, counters);
+  }
+
+  @Override
+  public String toString() {
+    return "ColumnList{" +
+            "columns=" + columns +
+            ", counters=" + counters +
+            '}';
+  }
 }
\ No newline at end of file
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/HBaseProjectionCriteria.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/HBaseProjectionCriteria.java
index 7432b02..6bacfb2 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/HBaseProjectionCriteria.java
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/HBaseProjectionCriteria.java
@@ -40,12 +40,16 @@ public class HBaseProjectionCriteria implements Serializable {
 
   public static class ColumnMetaData implements Serializable {
 
-    private byte[]  columnFamily;
+    private byte[] columnFamily;
     private byte[] qualifier;
 
     public ColumnMetaData(String columnFamily, String qualifier) {
-      this.columnFamily = columnFamily.getBytes();
-      this.qualifier = qualifier.getBytes();
+      this(columnFamily.getBytes(), qualifier.getBytes());
+    }
+
+    public ColumnMetaData(byte[] columnFamily, byte[] qualifier) {
+      this.columnFamily = columnFamily;
+      this.qualifier = qualifier;
     }
 
     public byte[] getColumnFamily() {
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
index d0d934e..f51927a 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
@@ -17,321 +17,114 @@
  *  limitations under the License.
  *
  */
-
 package org.apache.metron.hbase.client;
 
-import static org.apache.commons.collections4.CollectionUtils.size;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.hbase.TableProvider;
 import org.apache.metron.hbase.ColumnList;
 import org.apache.metron.hbase.HBaseProjectionCriteria;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
 
 /**
  * A client that interacts with HBase.
  */
-public class HBaseClient implements Closeable {
-
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+public interface HBaseClient extends Closeable {
 
   /**
-   * The batch of queued Mutations.
+   * Enqueues a 'get' request that will be submitted when {@link #getAll()} is called.
+   * @param rowKey The row key to be retrieved.
    */
-  List<Mutation> mutations;
+  void addGet(byte[] rowKey, HBaseProjectionCriteria criteria);
 
   /**
-   * The batch of queued Gets.
+   * Submits all pending get operations and returns the result of each.
+   * @return The result of each pending get request.
    */
-  List<Get> gets;
-
-  /**
-   * The HBase table this client interacts with.
-   */
-  private HTableInterface table;
-
-  public HBaseClient(TableProvider provider, final Configuration configuration, final String tableName) {
-    this.mutations = new ArrayList<>();
-    this.gets = new ArrayList<>();
-    try {
-      this.table = provider.getTable(configuration, tableName);
-    } catch (Exception e) {
-      String msg = String.format("Unable to open connection to HBase for table '%s'", tableName);
-      LOG.error(msg, e);
-      throw new RuntimeException(msg, e);
-    }
-  }
+  Result[] getAll();
 
   /**
-   * Add a Mutation such as a Put or Increment to the batch.  The Mutation is only queued for
-   * later execution.
-   *
-   * @param rowKey     The row key of the Mutation.
-   * @param cols       The columns affected by the Mutation.
-   * @param durability The durability of the mutation.
+   * Clears all pending get operations.
    */
-  public void addMutation(byte[] rowKey, ColumnList cols, Durability durability) {
-
-    if (cols.hasColumns()) {
-      Put put = createPut(rowKey, cols, durability);
-      mutations.add(put);
-    }
-
-    if (cols.hasCounters()) {
-      Increment inc = createIncrement(rowKey, cols, durability);
-      mutations.add(inc);
-    }
-
-    if (mutations.isEmpty()) {
-      mutations.add(new Put(rowKey));
-    }
-  }
+  void clearGets();
 
   /**
-   * Adds a Mutation such as a Put or Increment with a time to live.  The Mutation is only queued
-   * for later execution.
+   * Scans an entire table returning all row keys as a List of Strings.
    *
-   * @param rowKey           The row key of the Mutation.
-   * @param cols             The columns affected by the Mutation.
-   * @param durability       The durability of the mutation.
-   * @param timeToLiveMillis The time to live in milliseconds.
-   */
-  public void addMutation(byte[] rowKey, ColumnList cols, Durability durability, Long timeToLiveMillis) {
-
-    if (cols.hasColumns()) {
-      Put put = createPut(rowKey, cols, durability, timeToLiveMillis);
-      mutations.add(put);
-    }
-
-    if (cols.hasCounters()) {
-      Increment inc = createIncrement(rowKey, cols, durability, timeToLiveMillis);
-      mutations.add(inc);
-    }
-
-    if (mutations.isEmpty()) {
-      Put put = new Put(rowKey);
-      put.setTTL(timeToLiveMillis);
-      mutations.add(put);
-    }
-  }
-
-  /**
-   * Remove all queued Mutations from the batch.
-   */
-  public void clearMutations() {
-    mutations.clear();
-  }
-
-  /**
-   * Submits all queued Mutations.
-   * @return The number of mutation submitted.
-   */
-  public int mutate() {
-    int mutationCount = mutations.size();
-    Object[] result = new Object[mutationCount];
-    try {
-      table.batch(mutations, result);
-      mutations.clear();
-
-    } catch (Exception e) {
-      String msg = String.format("'%d' HBase write(s) failed on table '%s'", size(mutations), tableName(table));
-      LOG.error(msg, e);
-      throw new RuntimeException(msg, e);
-    }
-
-    return mutationCount;
-  }
-
-  /**
-   * Adds a Get to the batch.
+   * <p><b>**WARNING**:</b> Do not use this method unless you're absolutely crystal clear about the performance
+   * impact. Doing full table scans in HBase can adversely impact performance.
    *
-   * @param rowKey   The row key of the Get
-   * @param criteria Defines the columns/families that will be retrieved.
-   */
-  public void addGet(byte[] rowKey, HBaseProjectionCriteria criteria) {
-    Get get = new Get(rowKey);
-
-    if (criteria != null) {
-      criteria.getColumnFamilies().forEach(cf -> get.addFamily(cf));
-      criteria.getColumns().forEach(col -> get.addColumn(col.getColumnFamily(), col.getQualifier()));
-    }
-
-    // queue the get
-    this.gets.add(get);
-  }
-
-  /**
-   * Clears all queued Gets from the batch.
+   * @return List of all row keys as Strings for this table.
    */
-  public void clearGets() {
-    gets.clear();
-  }
+  List<String> scanRowKeys() throws IOException;
 
   /**
-   * Submit all queued Gets.
+   * Scans the table and returns each result.
    *
-   * @return The Result of each queued Get.
-   */
-  public Result[] getAll() {
-    try {
-      Result[] results = table.get(gets);
-      gets.clear();
-      return results;
-
-    } catch (Exception e) {
-      String msg = String.format("'%d' HBase read(s) failed on table '%s'", size(gets), tableName(table));
-      LOG.error(msg, e);
-      throw new RuntimeException(msg, e);
-    }
-  }
-
-  /**
-   * Close the table.
-   */
-  @Override
-  public void close() throws IOException {
-    if(table != null) {
-      table.close();
-    }
-  }
-
-  /**
-   * Creates an HBase Put.
+   * <p><b>**WARNING**:</b> Do not use this method unless you're absolutely crystal clear about the performance
+   * impact. Doing full table scans in HBase can adversely impact performance.
    *
-   * @param rowKey     The row key.
-   * @param cols       The columns to put.
-   * @param durability The durability of the put.
+   * @return The results from the scan.
+   * @throws IOException
    */
-  private Put createPut(byte[] rowKey, ColumnList cols, Durability durability) {
-    Put put = new Put(rowKey);
-    put.setDurability(durability);
-    addColumns(cols, put);
-    return put;
-  }
+  Result[] scan(int numRows) throws IOException;
 
   /**
-   * Creates an HBase Put.
+   * Enqueues a {@link org.apache.hadoop.hbase.client.Mutation} such as a put or
+   * increment.  The operation is enqueued for later execution.
    *
-   * @param rowKey           The row key.
-   * @param cols             The columns to put.
-   * @param durability       The durability of the put.
-   * @param timeToLiveMillis The TTL in milliseconds.
+   * @param rowKey     The row key of the Mutation.
+   * @param cols       The columns affected by the Mutation.
    */
-  private Put createPut(byte[] rowKey, ColumnList cols, Durability durability, long timeToLiveMillis) {
-    Put put = new Put(rowKey);
-    put.setDurability(durability);
-    put.setTTL(timeToLiveMillis);
-    addColumns(cols, put);
-    return put;
-  }
+  void addMutation(byte[] rowKey, ColumnList cols);
 
   /**
-   * Adds the columns to the Put
+   * Enqueues a {@link org.apache.hadoop.hbase.client.Mutation} such as a put or
+   * increment.  The operation is enqueued for later execution.
    *
-   * @param cols The columns to add.
-   * @param put  The Put.
+   * @param rowKey     The row key of the Mutation.
+   * @param cols       The columns affected by the Mutation.
+   * @param durability The durability of the mutation.
    */
-  private void addColumns(ColumnList cols, Put put) {
-    for (ColumnList.Column col : cols.getColumns()) {
-
-      if (col.getTs() > 0) {
-        put.add(col.getFamily(), col.getQualifier(), col.getTs(), col.getValue());
-
-      } else {
-        put.add(col.getFamily(), col.getQualifier(), col.getValue());
-      }
-    }
-  }
+  void addMutation(byte[] rowKey, ColumnList cols, Durability durability);
 
   /**
-   * Creates an HBase Increment for a counter.
+   * Enqueues a {@link org.apache.hadoop.hbase.client.Mutation} such as a put or
+   * increment.  The operation is enqueued for later execution.
    *
-   * @param rowKey     The row key.
-   * @param cols       The columns to include.
-   * @param durability The durability of the increment.
+   * @param rowKey           The row key of the Mutation.
+   * @param cols             The columns affected by the Mutation.
+   * @param durability       The durability of the mutation.
+   * @param timeToLiveMillis The time to live in milliseconds.
    */
-  private Increment createIncrement(byte[] rowKey, ColumnList cols, Durability durability) {
-    Increment inc = new Increment(rowKey);
-    inc.setDurability(durability);
-    cols.getCounters().forEach(cnt -> inc.addColumn(cnt.getFamily(), cnt.getQualifier(), cnt.getIncrement()));
-    return inc;
-  }
+  void addMutation(byte[] rowKey, ColumnList cols, Durability durability, Long timeToLiveMillis);
 
   /**
-   * Creates an HBase Increment for a counter.
+   * Ensures that all pending mutations have completed.
    *
-   * @param rowKey     The row key.
-   * @param cols       The columns to include.
-   * @param durability The durability of the increment.
+   * @return The number of operations completed.
    */
-  private Increment createIncrement(byte[] rowKey, ColumnList cols, Durability durability, long timeToLiveMillis) {
-    Increment inc = new Increment(rowKey);
-    inc.setDurability(durability);
-    inc.setTTL(timeToLiveMillis);
-    cols.getCounters().forEach(cnt -> inc.addColumn(cnt.getFamily(), cnt.getQualifier(), cnt.getIncrement()));
-    return inc;
-  }
+  int mutate();
 
   /**
-   * Returns the name of the HBase table.
-   * <p>Attempts to avoid any null pointers that might be encountered along the way.
-   * @param table The table to retrieve the name of.
-   * @return The name of the table
+   * Clears all pending mutations.
    */
-  private static String tableName(HTableInterface table) {
-    String tableName = "null";
-    if(table != null) {
-      if(table.getName() != null) {
-        tableName = table.getName().getNameAsString();
-      }
-    }
-    return tableName;
-  }
+  void clearMutations();
 
   /**
-   * Puts a record into the configured HBase table synchronously (not batched).
+   * Delete a record by row key.
+   *
+   * @param rowKey The row key to delete.
    */
-  public void put(String rowKey, String columnFamily, String columnQualifier, String value)
-      throws IOException {
-    Put put = new Put(Bytes.toBytes(rowKey));
-    put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier),
-        Bytes.toBytes(value));
-    table.put(put);
-  }
+  void delete(byte[] rowKey);
 
   /**
-   * Scans an entire table returning all row keys as a List of Strings.
+   * Delete a column or set of columns by row key.
    *
-   * <p>
-   * <b>**WARNING**:</b> Do not use this method unless you're absolutely crystal clear about the performance
-   * impact. Doing full table scans in HBase can adversely impact performance.
-   *
-   * @return List of all row keys as Strings for this table.
+   * @param rowKey The row key to delete.
+   * @param columnList The set of columns to delete.
    */
-  public List<String> readRecords() throws IOException {
-    Scan scan = new Scan();
-    ResultScanner scanner = table.getScanner(scan);
-    List<String> rows = new ArrayList<>();
-    for (Result r = scanner.next(); r != null; r = scanner.next()) {
-      rows.add(Bytes.toString(r.getRow()));
-    }
-    return rows;
-  }
-
+  void delete(byte[] rowKey, ColumnList columnList);
 }
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClientFactory.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClientFactory.java
new file mode 100644
index 0000000..b38b07b
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClientFactory.java
@@ -0,0 +1,70 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.InvocationTargetException;
+import java.util.function.Supplier;
+
+/**
+ * Responsible for creating an {@link HBaseTableClient}.
+ */
+public interface HBaseClientFactory extends Serializable {
+  Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * @param factory The connection factory for creating connections to HBase.
+   * @param configuration The HBase configuration.
+   * @param tableName The name of the HBase table.
+   * @return An {@link HBaseTableClient}.
+   */
+  HBaseClient create(HBaseConnectionFactory factory, Configuration configuration, String tableName);
+
+  /**
+   * Instantiates a new {@link HBaseClientFactory} by class name.
+   *
+   * @param className The class name of the {@link HBaseClientFactory} to instantiate.
+   * @param defaultImpl The default instance to instantiate if the className is invalid.
+   * @return A new {@link HBaseClientFactory}.
+   */
+  static HBaseClientFactory byName(String className, Supplier<HBaseClientFactory> defaultImpl) {
+    LOG.debug("Creating HBase client creator; className={}", className);
+
+    if(className == null || className.length() == 0 || className.charAt(0) == '$') {
+      LOG.debug("Using default hbase client creator");
+      return defaultImpl.get();
+
+    } else {
+      try {
+        Class<? extends HBaseClientFactory> clazz = (Class<? extends HBaseClientFactory>) Class.forName(className);
+        return clazz.getConstructor().newInstance();
+
+      } catch(InstantiationException | IllegalAccessException | InvocationTargetException |
+              NoSuchMethodException | ClassNotFoundException e) {
+        throw new IllegalStateException("Unable to instantiate connector.", e);
+      }
+    }
+  }
+}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseConnectionFactory.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseConnectionFactory.java
new file mode 100644
index 0000000..bda26c5
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseConnectionFactory.java
@@ -0,0 +1,59 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Establishes a {@link Connection} to HBase.
+ */
+public class HBaseConnectionFactory implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public Connection createConnection(Configuration configuration) throws IOException {
+    return ConnectionFactory.createConnection(configuration);
+  }
+
+  /**
+   * Creates an {@link HBaseConnectionFactory} based on a fully-qualified class name.
+   *
+   * @param className The fully-qualified class name to instantiate.
+   * @return A {@link HBaseConnectionFactory}.
+   */
+  public static HBaseConnectionFactory byName(String className) {
+    LOG.debug("Creating HBase connection factory; className={}", className);
+    try {
+      Class<? extends HBaseConnectionFactory> clazz = (Class<? extends HBaseConnectionFactory>) Class.forName(className);
+      return clazz.getConstructor().newInstance();
+
+    } catch (InstantiationException | NoSuchMethodException | IllegalAccessException | ClassNotFoundException | InvocationTargetException e) {
+      throw new IllegalStateException("Unable to instantiate HBaseConnectionFactory.", e);
+    }
+  }
+}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClient.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClient.java
new file mode 100644
index 0000000..60d2328
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClient.java
@@ -0,0 +1,285 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.hbase.ColumnList;
+import org.apache.metron.hbase.HBaseProjectionCriteria;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.commons.collections4.CollectionUtils.size;
+
+/**
+ * An {@link HBaseClient} that uses the {@link Table} API to interact with HBase.
+ */
+public class HBaseTableClient implements HBaseClient {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private List<Mutation> mutations;
+  private List<Get> gets;
+  private Connection connection;
+  private Table table;
+
+  /**
+   * @param connectionFactory Creates connections to HBase.
+   * @param configuration The HBase configuration.
+   * @param tableName The name of the HBase table.
+   */
+  public HBaseTableClient(HBaseConnectionFactory connectionFactory, Configuration configuration, String tableName) throws IOException {
+    gets = new ArrayList<>();
+    mutations = new ArrayList<>();
+    connection = connectionFactory.createConnection(configuration);
+    table = connection.getTable(TableName.valueOf(tableName));
+  }
+
+  @Override
+  public void close() {
+    try {
+      if(table != null) {
+        table.close();
+      }
+    } catch(IOException e) {
+      LOG.error("Error while closing HBase table", e);
+    }
+
+    try {
+      if(connection != null) {
+        connection.close();
+      }
+    } catch(IOException e) {
+      LOG.error("Error while closing HBase connection",e);
+    }
+  }
+
+  @Override
+  public void addGet(byte[] rowKey, HBaseProjectionCriteria criteria) {
+    Get get = new Get(rowKey);
+
+    // define which column families and columns are needed
+    if (criteria != null) {
+      criteria.getColumnFamilies().forEach(cf -> get.addFamily(cf));
+      criteria.getColumns().forEach(col -> get.addColumn(col.getColumnFamily(), col.getQualifier()));
+    }
+
+    // queue the get
+    this.gets.add(get);
+  }
+
+  @Override
+  public Result[] getAll() {
+    try {
+      return table.get(gets);
+
+    } catch (Exception e) {
+      String msg = String.format("'%d' HBase read(s) failed on table '%s'", size(gets), tableName(table));
+      LOG.error(msg, e);
+      throw new RuntimeException(msg, e);
+
+    } finally {
+      gets.clear();
+    }
+  }
+
+  @Override
+  public void clearGets() {
+    gets.clear();
+  }
+
+  @Override
+  public List<String> scanRowKeys() throws IOException {
+    List<String> rowKeys = new ArrayList<>();
+    ResultScanner scanner = getScanner();
+    for (Result r = scanner.next(); r != null; r = scanner.next()) {
+      String rowKeyAsString = Bytes.toString(r.getRow());
+      rowKeys.add(rowKeyAsString);
+    }
+    return rowKeys;
+  }
+
+  @Override
+  public Result[] scan(int numRows) throws IOException {
+    return getScanner().next(numRows);
+  }
+
+  private ResultScanner getScanner() throws IOException {
+    Scan scan = new Scan();
+    return table.getScanner(scan);
+  }
+
+  /**
+   * Returns the name of the HBase table.
+   * <p>Attempts to avoid any null pointers that might be encountered along the way.
+   * @param table The table to retrieve the name of.
+   * @return The name of the table
+   */
+  private static String tableName(Table table) {
+    String tableName = "null";
+    if(table != null) {
+      if(table.getName() != null) {
+        tableName = table.getName().getNameAsString();
+      }
+    }
+    return tableName;
+  }
+
+  @Override
+  public void addMutation(byte[] rowKey, ColumnList cols) {
+    HBaseWriterParams params = new HBaseWriterParams();
+    addMutation(rowKey, cols, params);
+  }
+
+  @Override
+  public void addMutation(byte[] rowKey, ColumnList cols, Durability durability) {
+    HBaseWriterParams params = new HBaseWriterParams()
+            .withDurability(durability);
+    addMutation(rowKey, cols, params);
+  }
+
+  @Override
+  public void addMutation(byte[] rowKey, ColumnList cols, Durability durability, Long timeToLiveMillis) {
+    HBaseWriterParams params = new HBaseWriterParams()
+            .withDurability(durability)
+            .withTimeToLive(timeToLiveMillis);
+    addMutation(rowKey, cols, params);
+  }
+
+  private void addMutation(byte[] rowKey, ColumnList cols, HBaseWriterParams params) {
+    if (cols.hasColumns()) {
+      Put put = createPut(rowKey, params);
+      addColumns(cols, put);
+      mutations.add(put);
+    }
+    if (cols.hasCounters()) {
+      Increment inc = createIncrement(rowKey, params);
+      addColumns(cols, inc);
+      mutations.add(inc);
+    }
+  }
+
+  @Override
+  public void clearMutations() {
+    mutations.clear();
+  }
+
+  @Override
+  public int mutate() {
+    int mutationCount = mutations.size();
+    if(mutationCount > 0) {
+      doMutate();
+    }
+
+    return mutationCount;
+  }
+
+  @Override
+  public void delete(byte[] rowKey) {
+    try {
+      Delete delete = new Delete(rowKey);
+      table.delete(delete);
+
+    } catch (Exception e) {
+      String msg = String.format("Unable to delete; table=%s", tableName(table));
+      LOG.error(msg, e);
+      throw new RuntimeException(msg, e);
+    }
+  }
+
+  @Override
+  public void delete(byte[] rowKey, ColumnList columnList) {
+    try {
+      Delete delete = new Delete(rowKey);
+      for(ColumnList.Column column: columnList.getColumns()) {
+        delete.addColumn(column.getFamily(), column.getQualifier());
+      }
+      table.delete(delete);
+
+    } catch (Exception e) {
+      String msg = String.format("Unable to delete; table=%s", tableName(table));
+      LOG.error(msg, e);
+      throw new RuntimeException(msg, e);
+    }
+  }
+
+  private void doMutate() {
+    Object[] result = new Object[mutations.size()];
+    try {
+      table.batch(mutations, result);
+
+    } catch (Exception e) {
+      String msg = String.format("'%d' HBase write(s) failed on table '%s'", size(mutations), tableName(table));
+      LOG.error(msg, e);
+      throw new RuntimeException(msg, e);
+
+    } finally {
+      mutations.clear();
+    }
+  }
+
+  private Put createPut(byte[] rowKey, HBaseWriterParams params) {
+    Put put = new Put(rowKey);
+    if(params.getTimeToLiveMillis() > 0) {
+      put.setTTL(params.getTimeToLiveMillis());
+    }
+    put.setDurability(params.getDurability());
+    return put;
+  }
+
+  private void addColumns(ColumnList cols, Put put) {
+    for (ColumnList.Column col: cols.getColumns()) {
+      if (col.getTs() > 0) {
+        put.addColumn(col.getFamily(), col.getQualifier(), col.getTs(), col.getValue());
+      } else {
+        put.addColumn(col.getFamily(), col.getQualifier(), col.getValue());
+      }
+    }
+  }
+
+  private void addColumns(ColumnList cols, Increment inc) {
+    cols.getCounters().forEach(cnt ->
+            inc.addColumn(cnt.getFamily(), cnt.getQualifier(), cnt.getIncrement()));
+  }
+
+  private Increment createIncrement(byte[] rowKey, HBaseWriterParams params) {
+    Increment inc = new Increment(rowKey);
+    if(params.getTimeToLiveMillis() > 0) {
+      inc.setTTL(params.getTimeToLiveMillis());
+    }
+    inc.setDurability(params.getDurability());
+    return inc;
+  }
+}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClientFactory.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClientFactory.java
new file mode 100644
index 0000000..8697e72
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClientFactory.java
@@ -0,0 +1,54 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+
+/**
+ * Creates an {@link HBaseTableClient}.
+ */
+public class HBaseTableClientFactory implements HBaseClientFactory {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * @param factory The factory that creates connections to HBase.
+   * @param configuration The HBase configuration.
+   * @param tableName The name of the HBase table.
+   * @return An {@link HBaseTableClient} that behaves synchronously.
+   */
+  @Override
+  public HBaseClient create(HBaseConnectionFactory factory,
+                            Configuration configuration,
+                            String tableName) {
+    try {
+      LOG.debug("Creating HBase client; table={}", tableName);
+      return new HBaseTableClient(factory, configuration, tableName);
+
+    } catch (Exception e) {
+      String msg = String.format("Unable to open connection to HBase for table '%s'", tableName);
+      LOG.error(msg, e);
+      throw new RuntimeException(msg, e);
+    }
+  }
+}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseWriterParams.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseWriterParams.java
new file mode 100644
index 0000000..ec93177
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseWriterParams.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase.client;
+
+import org.apache.hadoop.hbase.client.Durability;
+
+/**
+ * Parameters that define how the {@link HBaseWriter} writes to HBase.
+ */
+public class HBaseWriterParams {
+  private Durability durability;
+  private Long timeToLiveMillis;
+
+  public HBaseWriterParams() {
+    durability = Durability.USE_DEFAULT;
+    timeToLiveMillis = 0L;
+  }
+
+  public HBaseWriterParams withDurability(Durability durability) {
+    this.durability = durability;
+    return this;
+  }
+
+  public HBaseWriterParams withTimeToLive(Long timeToLiveMillis) {
+    this.timeToLiveMillis = timeToLiveMillis;
+    return this;
+  }
+
+  public Durability getDurability() {
+    return durability;
+  }
+
+  public Long getTimeToLiveMillis() {
+    return timeToLiveMillis;
+  }
+}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/LegacyHBaseClient.java
similarity index 98%
copy from metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
copy to metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/LegacyHBaseClient.java
index d0d934e..d3f22c0 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseClient.java
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/LegacyHBaseClient.java
@@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory;
 /**
  * A client that interacts with HBase.
  */
-public class HBaseClient implements Closeable {
+public class LegacyHBaseClient implements Closeable {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -66,7 +66,7 @@ public class HBaseClient implements Closeable {
    */
   private HTableInterface table;
 
-  public HBaseClient(TableProvider provider, final Configuration configuration, final String tableName) {
+  public LegacyHBaseClient(TableProvider provider, final Configuration configuration, final String tableName) {
     this.mutations = new ArrayList<>();
     this.gets = new ArrayList<>();
     try {
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
index 1983fc7..f9a9f1f 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/HBaseClientTest.java
@@ -63,7 +63,7 @@ public class HBaseClientTest {
   private static final String tableName = "table";
 
   private static HBaseTestingUtility util;
-  private static HBaseClient client;
+  private static LegacyHBaseClient client;
   private static HTableInterface table;
   private static Admin admin;
   private static byte[] cf = Bytes.toBytes("cf");
@@ -87,7 +87,7 @@ public class HBaseClientTest {
     table = util.createTable(Bytes.toBytes(tableName), cf);
     util.waitTableEnabled(table.getName());
     // setup the client
-    client = new HBaseClient((c,t) -> table, table.getConfiguration(), tableName);
+    client = new LegacyHBaseClient((c, t) -> table, table.getConfiguration(), tableName);
   }
 
   @AfterClass
@@ -259,7 +259,7 @@ public class HBaseClientTest {
     TableProvider tableProvider = mock(TableProvider.class);
     when(tableProvider.getTable(any(), any())).thenThrow(new IllegalArgumentException("test exception"));
 
-    client = new HBaseClient(tableProvider, HBaseConfiguration.create(), tableName);
+    client = new LegacyHBaseClient(tableProvider, HBaseConfiguration.create(), tableName);
   }
 
   @Test(expected = RuntimeException.class)
@@ -271,7 +271,7 @@ public class HBaseClientTest {
     TableProvider tableProvider = mock(TableProvider.class);
     when(tableProvider.getTable(any(), any())).thenReturn(table);
 
-    client = new HBaseClient(tableProvider, HBaseConfiguration.create(), tableName);
+    client = new LegacyHBaseClient(tableProvider, HBaseConfiguration.create(), tableName);
     client.addMutation(rowKey1, cols1, Durability.SYNC_WAL);
     client.mutate();
   }
@@ -288,7 +288,7 @@ public class HBaseClientTest {
     HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
     criteria.addColumnFamily(Bytes.toString(cf));
 
-    client = new HBaseClient(tableProvider, HBaseConfiguration.create(), tableName);
+    client = new LegacyHBaseClient(tableProvider, HBaseConfiguration.create(), tableName);
     client.addGet(rowKey1, criteria);
     client.addGet(rowKey2, criteria);
     client.getAll();
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/integration/HBaseTableClientIntegrationTest.java b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/integration/HBaseTableClientIntegrationTest.java
new file mode 100644
index 0000000..d9db3cf
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/metron/hbase/client/integration/HBaseTableClientIntegrationTest.java
@@ -0,0 +1,286 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.hbase.client.integration;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.hbase.ColumnList;
+import org.apache.metron.hbase.HBaseProjectionCriteria;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
+import org.apache.metron.hbase.client.HBaseTableClient;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * An integration test for the {@link HBaseTableClient}.
+ */
+public class HBaseTableClientIntegrationTest {
+  private static final String tableName = "widgets";
+  private static final String columnFamily = "W";
+  private static final byte[] columnFamilyB = Bytes.toBytes(columnFamily);
+  private static final String columnQualifier = "column";
+  private static final byte[] columnQualifierB = Bytes.toBytes(columnQualifier);
+  private static final String rowKey1String = "row-key-1";
+  private static final byte[] rowKey1 = Bytes.toBytes(rowKey1String);
+  private static final String rowKey2String = "row-key-2";
+  private static final byte[] rowKey2 = Bytes.toBytes(rowKey2String);
+  private static HBaseTestingUtility util;
+  private static Table table;
+  private HBaseTableClient client;
+
+  @BeforeClass
+  public static void startHBase() throws Exception {
+    Configuration config = HBaseConfiguration.create();
+    config.set("hbase.master.hostname", "localhost");
+    config.set("hbase.regionserver.hostname", "localhost");
+
+    util = new HBaseTestingUtility(config);
+    util.startMiniCluster();
+
+    // create the table
+    table = util.createTable(TableName.valueOf(tableName), columnFamily);
+    util.waitTableEnabled(table.getName());
+  }
+
+  @AfterClass
+  public static void stopHBase() throws Exception {
+    util.deleteTable(table.getName());
+    util.shutdownMiniCluster();
+    util.cleanupTestDir();
+  }
+
+  @Before
+  public void setup() throws IOException {
+    client = new HBaseTableClient(new HBaseConnectionFactory(), util.getConfiguration(), tableName);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    // delete all records in the table
+    List<Delete> deletions = new ArrayList<>();
+    for(Result r : table.getScanner(new Scan())) {
+      deletions.add(new Delete(r.getRow()));
+    }
+    table.delete(deletions);
+
+    if(client != null) {
+      client.close();
+    }
+  }
+
+  @Test
+  public void testMutate() throws Exception {
+    // write some values
+    ColumnList columns = new ColumnList()
+            .addColumn(columnFamily, columnQualifier, "value1");
+    client.addMutation(rowKey1, columns, Durability.SKIP_WAL);
+    client.mutate();
+
+    // read back the value
+    client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily));
+    Result[] results = client.getAll();
+
+    // validate
+    assertEquals(1, results.length);
+    assertEquals("value1", getValue(results[0], columnFamily, columnQualifier));
+  }
+
+  @Test
+  public void testMutateMultipleColumns() throws Exception {
+    // write some values
+    ColumnList columns = new ColumnList()
+            .addColumn(columnFamily, "col1", "value1")
+            .addColumn(columnFamily, "col2", "value2");
+    client.addMutation(rowKey1, columns, Durability.SKIP_WAL);
+    client.mutate();
+
+    // read back the value
+    client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily));
+    Result[] results = client.getAll();
+
+    // validate
+    assertEquals(1, results.length);
+    assertEquals("value1", getValue(results[0], columnFamily, "col1"));
+    assertEquals("value2", getValue(results[0], columnFamily, "col2"));
+  }
+
+  @Test
+  public void testNoMutations() throws Exception {
+    // do not add any mutations before attempting to write
+    int count = client.mutate();
+    Assert.assertEquals(0, count);
+
+    // attempt to read
+    HBaseProjectionCriteria criteria = new HBaseProjectionCriteria().addColumnFamily(columnFamily);
+    client.addGet(rowKey1, criteria);
+    client.addGet(rowKey2, criteria);
+    Result[] results = client.getAll();
+
+    // nothing should have been read
+    assertEquals(2, results.length);
+    for(Result result : results) {
+      Assert.assertTrue(result.isEmpty());
+    }
+  }
+
+  @Test
+  public void testScan() throws Exception {
+    // write some values
+    client.addMutation(rowKey1, new ColumnList().addColumn(columnFamily, columnQualifier, "value1"), Durability.SKIP_WAL);
+    client.mutate();
+
+    // scan the table
+    Result[] results = client.scan(10);
+    assertEquals(1, results.length);
+
+    assertArrayEquals(rowKey1, results[0].getRow());
+    String actual1 = Bytes.toString(results[0].getValue(columnFamilyB, columnQualifierB));
+    assertEquals("value1", actual1);
+  }
+
+  @Test
+  public void testScanLimit() throws Exception {
+    // write some values
+    client.addMutation(rowKey1, new ColumnList().addColumn(columnFamily, columnQualifier, "value1"), Durability.SKIP_WAL);
+    client.addMutation(rowKey2, new ColumnList().addColumn(columnFamily, columnQualifier, "value2"), Durability.SKIP_WAL);
+    client.mutate();
+
+    // scan the table, but limit to 1 result
+    Result[] results = client.scan(1);
+    assertEquals(1, results.length);
+  }
+
+  @Test
+  public void testScanNothing() throws Exception {
+    // scan the table, but there is nothing there
+    Result[] results = client.scan(1);
+    assertEquals(0, results.length);
+  }
+
+  @Test
+  public void testScanRowKeys() throws Exception {
+    // write some values
+    client.addMutation(rowKey1, new ColumnList().addColumn(columnFamily, columnQualifier, "value1"), Durability.SKIP_WAL);
+    client.addMutation(rowKey2, new ColumnList().addColumn(columnFamily, columnQualifier, "value2"), Durability.SKIP_WAL);
+    client.mutate();
+
+    // scan the table
+    List<String> rowKeys = client.scanRowKeys();
+    List<String> expected = Arrays.asList(rowKey1String, rowKey2String);
+    assertEquals(new HashSet<>(expected), new HashSet<>(rowKeys));
+  }
+
+  @Test
+  public void testDelete() {
+    // write some values
+    client.addMutation(rowKey1, new ColumnList().addColumn(columnFamily, columnQualifier, "value1"), Durability.SKIP_WAL);
+    client.addMutation(rowKey2, new ColumnList().addColumn(columnFamily, columnQualifier, "value2"), Durability.SKIP_WAL);
+    client.mutate();
+
+    client.delete(rowKey1);
+
+    // the deleted row key should no longer exist
+    client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily));
+    Assert.assertTrue(client.getAll()[0].isEmpty());
+
+    // the other row key should remain
+    client.addGet(rowKey2, new HBaseProjectionCriteria().addColumnFamily(columnFamily));
+    Assert.assertFalse(client.getAll()[0].isEmpty());
+  }
+
+  @Test
+  public void testDeleteNothing() {
+    // nothing should blow-up if we attempt to delete something that does not exist
+    client.delete(rowKey1);
+  }
+
+  @Test
+  public void testDeleteColumn() {
+    // write some values
+    ColumnList columns = new ColumnList()
+            .addColumn(columnFamily, "col1", "value1")
+            .addColumn(columnFamily, "col2", "value2");
+    client.addMutation(rowKey1, columns, Durability.SKIP_WAL);
+    client.mutate();
+
+    // delete a column
+    client.delete(rowKey1, new ColumnList().addColumn(columnFamily, "col1"));
+
+    // read back the value
+    client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily));
+    Result[] results = client.getAll();
+
+    // validate
+    assertEquals(1, results.length);
+    assertNull(getValue(results[0], columnFamily, "col1"));
+    assertEquals("value2", getValue(results[0], columnFamily, "col2"));
+  }
+
+  @Test
+  public void testDeleteAllColumns() {
+    // write some values
+    ColumnList columns = new ColumnList()
+            .addColumn(columnFamily, "col1", "value1")
+            .addColumn(columnFamily, "col2", "value2");
+    client.addMutation(rowKey1, columns, Durability.SKIP_WAL);
+    client.mutate();
+
+    // delete both columns individually
+    client.delete(rowKey1, new ColumnList().addColumn(columnFamily, "col1"));
+    client.delete(rowKey1, new ColumnList().addColumn(columnFamily, "col2"));
+
+    // read back the value
+    client.addGet(rowKey1, new HBaseProjectionCriteria().addColumnFamily(columnFamily));
+    Result[] results = client.getAll();
+
+    // validate
+    assertEquals(1, results.length);
+    assertNull(getValue(results[0], columnFamily, "col1"));
+    assertNull(getValue(results[0], columnFamily, "col2"));
+  }
+
+  private String getValue(Result result, String columnFamily, String columnQualifier) {
+    byte[] value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier));
+    return Bytes.toString(value);
+  }
+}
diff --git a/metron-platform/metron-pcap/pom.xml b/metron-platform/metron-pcap/pom.xml
index c839285..ffa7f3d 100644
--- a/metron-platform/metron-pcap/pom.xml
+++ b/metron-platform/metron-pcap/pom.xml
@@ -194,5 +194,10 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${global_httpclient_version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/pom.xml b/pom.xml
index 66a935e..f8c56ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,7 +91,6 @@
         <base_flux_version>1.0.1</base_flux_version>
         <base_kafka_version>0.10.0</base_kafka_version>
         <base_hadoop_version>2.7.1</base_hadoop_version>
-        <base_hbase_version>1.1.1</base_hbase_version>
         <base_flume_version>1.5.2</base_flume_version>
         <!-- full dependency versions -->
         <global_accumulo_version>1.8.0</global_accumulo_version>
@@ -106,14 +105,12 @@
         <global_pcap_version>1.7.1</global_pcap_version>
         <global_kafka_version>0.10.0.1</global_kafka_version>
         <global_hadoop_version>${base_hadoop_version}</global_hadoop_version>
-        <global_hbase_version>${base_hbase_version}</global_hbase_version>
         <global_flume_version>${base_flume_version}</global_flume_version>
         <global_elasticsearch_version>5.6.14</global_elasticsearch_version>
         <global_json_simple_version>1.1.1</global_json_simple_version>
         <global_metrics_version>3.0.2</global_metrics_version>
         <global_junit_version>4.12</global_junit_version>
         <global_guava_version>17.0</global_guava_version>
-        <global_hbase_guava_version>12.0</global_hbase_guava_version>
         <global_json_schema_validator_version>2.2.5</global_json_schema_validator_version>
         <global_slf4j_version>1.7.7</global_slf4j_version>
         <global_opencsv_version>3.7</global_opencsv_version>
@@ -139,9 +136,27 @@
         <global_jacoco_version>0.8.3</global_jacoco_version>
         <argLine></argLine>
     </properties>
-
     <profiles>
         <profile>
+            <id>HDP-3.1</id>
+            <properties>
+                <hdp_version>3.1.0.0</hdp_version>
+                <global_hbase_version>2.0.2</global_hbase_version>
+                <global_hbase_guava_version>17.0</global_hbase_guava_version>
+            </properties>
+        </profile>
+        <profile>
+            <id>HDP-2.6</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <properties>
+                <hdp_version>2.6.5.0</hdp_version>
+                <global_hbase_version>1.1.1</global_hbase_version>
+                <global_hbase_guava_version>12.0</global_hbase_guava_version>
+            </properties>
+        </profile>
+        <profile>
             <id>HDP-2.5.0.0</id>
             <properties>
                 <hdp_version>2.5.0.0</hdp_version>
@@ -149,10 +164,11 @@
                 <global_storm_kafka_version>1.2.2</global_storm_kafka_version>
                 <global_storm_version>${base_storm_version}.${hdp_version}-${build_number}</global_storm_version>
                 <global_kafka_version>${base_kafka_version}.${hdp_version}-${build_number}</global_kafka_version>
+                <global_hbase_version>1.1.1</global_hbase_version>
+                <global_hbase_guava_version>12.0</global_hbase_guava_version>
             </properties>
         </profile>
     </profiles>
-
     <dependencyManagement>
         <dependencies>
             <dependency>