You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/08/08 20:40:49 UTC

[metron] branch feature/METRON-2088-support-hdp-3.1 updated: METRON-2177 Upgrade Profiler for HBase 2.0.2 (nickwallen) closes apache/metron#1458

This is an automated email from the ASF dual-hosted git repository.

nickallen pushed a commit to branch feature/METRON-2088-support-hdp-3.1
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/feature/METRON-2088-support-hdp-3.1 by this push:
     new 1e1afc3  METRON-2177 Upgrade Profiler for HBase 2.0.2 (nickwallen) closes apache/metron#1458
1e1afc3 is described below

commit 1e1afc33332ac3b4618a8f19ffcb02d65ad4b369
Author: nickwallen <ni...@nickallen.org>
AuthorDate: Thu Aug 8 16:40:26 2019 -0400

    METRON-2177 Upgrade Profiler for HBase 2.0.2 (nickwallen) closes apache/metron#1458
---
 .../profiler/client/HBaseProfilerClient.java       | 117 +++--
 .../client/HBaseProfilerClientFactory.java         | 101 ++++
 .../metron/profiler/client/ProfilerClient.java     |   3 +-
 .../profiler/client/ProfilerClientFactories.java   |  45 ++
 .../profiler/client/ProfilerClientFactory.java     |  36 ++
 .../metron/profiler/client/stellar/GetProfile.java | 255 +++++-----
 .../client/stellar/ProfilerClientConfig.java       |   8 +-
 .../metron/profiler/client/stellar/Util.java       |   2 +-
 .../profiler/client/stellar/VerboseProfile.java    | 165 +++----
 .../client/HBaseProfilerClientFactoryTest.java     | 108 +++++
 .../profiler/client/HBaseProfilerClientTest.java   | 287 ++++++------
 .../metron/profiler/client/ProfileWriter.java      | 125 -----
 .../profiler/client/stellar/GetProfileTest.java    | 512 ++++++---------------
 .../client/stellar/VerboseProfileTest.java         | 294 ++++++------
 .../metron/profiler/hbase/SaltyRowKeyBuilder.java  |   8 +
 .../profiler/hbase/ValueOnlyColumnBuilder.java     |   3 -
 .../profiler/DefaultMessageDistributorTest.java    |   2 -
 metron-analytics/metron-profiler-spark/pom.xml     |   6 +
 .../metron/profiler/spark/BatchProfiler.java       |   6 +-
 .../metron/profiler/spark/BatchProfilerConfig.java |   7 +-
 .../spark/function/HBaseWriterFunction.java        | 159 ++++---
 .../spark/BatchProfilerIntegrationTest.java        |  44 +-
 .../spark/function/HBaseWriterFunctionTest.java    |  96 ++--
 metron-analytics/metron-profiler-storm/pom.xml     |  10 +
 .../src/main/config/profiler.properties            |   2 +
 .../src/main/flux/profiler/remote.yaml             |   9 +-
 .../org/apache/metron/hbase/bolt/HBaseBolt.java    | 101 ++--
 .../apache/metron/hbase/bolt/HBaseBoltTest.java    |  19 +-
 .../java/org/apache/metron/hbase/bolt/Widget.java  |  84 ++++
 .../org/apache/metron/hbase/bolt/WidgetMapper.java |  71 +++
 .../storm/integration/ProfilerIntegrationTest.java |  50 +-
 .../package/templates/profiler.properties.j2       |   2 +
 .../metron/hbase/client/HBaseTableClient.java      |   2 +-
 .../metron/stellar/dsl/StellarFunctionInfo.java    |   6 +-
 .../functions/resolver/BaseFunctionResolver.java   |  18 +-
 .../dsl/functions/resolver/FunctionResolver.java   |  17 +-
 .../functions/resolver/SimpleFunctionResolver.java |  13 +-
 37 files changed, 1461 insertions(+), 1332 deletions(-)

diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
index 2e537da..b9d0384 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClient.java
@@ -20,11 +20,11 @@
 
 package org.apache.metron.profiler.client;
 
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.utils.SerDeUtils;
+import org.apache.metron.hbase.HBaseProjectionCriteria;
+import org.apache.metron.hbase.client.HBaseClient;
 import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.metron.profiler.ProfilePeriod;
 import org.apache.metron.profiler.hbase.ColumnBuilder;
@@ -44,7 +44,7 @@ public class HBaseProfilerClient implements ProfilerClient {
   /**
    * Used to access the profile data stored in HBase.
    */
-  private HTableInterface table;
+  private HBaseClient hbaseClient;
 
   /**
    * Generates the row keys necessary to scan HBase.
@@ -52,19 +52,22 @@ public class HBaseProfilerClient implements ProfilerClient {
   private RowKeyBuilder rowKeyBuilder;
 
   /**
-   * Knows how profiles are organized in HBase.
+   * Knows how profiles are organized by columns in HBase.
    */
   private ColumnBuilder columnBuilder;
 
+  /**
+   * The period duration in milliseconds for the profiles that will be read by this client.
+   */
   private long periodDurationMillis;
 
-  public HBaseProfilerClient(HTableInterface table,
+  public HBaseProfilerClient(HBaseClient hbaseClient,
                              RowKeyBuilder rowKeyBuilder,
                              ColumnBuilder columnBuilder,
                              long periodDurationMillis) {
-    setTable(table);
-    setRowKeyBuilder(rowKeyBuilder);
-    setColumnBuilder(columnBuilder);
+    this.rowKeyBuilder = rowKeyBuilder;
+    this.columnBuilder = columnBuilder;
+    this.hbaseClient = hbaseClient;
     this.periodDurationMillis = periodDurationMillis;
   }
 
@@ -82,7 +85,13 @@ public class HBaseProfilerClient implements ProfilerClient {
    * @return A list of values.
    */
   @Override
-  public <T> List<ProfileMeasurement> fetch(Class<T> clazz, String profile, String entity, List<Object> groups, long start, long end, Optional<T> defaultValue) {
+  public <T> List<ProfileMeasurement> fetch(Class<T> clazz,
+                                            String profile,
+                                            String entity,
+                                            List<Object> groups,
+                                            long start,
+                                            long end,
+                                            Optional<T> defaultValue) {
     List<ProfilePeriod> periods = ProfilePeriod.visitPeriods(
             start,
             end,
@@ -93,6 +102,8 @@ public class HBaseProfilerClient implements ProfilerClient {
     return fetch(clazz, profile, entity, groups, periods, defaultValue);
   }
 
+
+
   /**
    * Fetch the values stored in a profile based on a set of timestamps.
    *
@@ -105,7 +116,12 @@ public class HBaseProfilerClient implements ProfilerClient {
    * @return A list of values.
    */
   @Override
-  public <T> List<ProfileMeasurement> fetch(Class<T> clazz, String profile, String entity, List<Object> groups, Iterable<ProfilePeriod> periods, Optional<T> defaultValue) {
+  public <T> List<ProfileMeasurement> fetch(Class<T> clazz,
+                                            String profile,
+                                            String entity,
+                                            List<Object> groups,
+                                            Iterable<ProfilePeriod> periods,
+                                            Optional<T> defaultValue) {
     // create a list of profile measurements that need fetched
     List<ProfileMeasurement> toFetch = new ArrayList<>();
     for(ProfilePeriod period: periods) {
@@ -120,59 +136,76 @@ public class HBaseProfilerClient implements ProfilerClient {
     return doFetch(toFetch, clazz, defaultValue);
   }
 
-  private <T> List<ProfileMeasurement> doFetch(List<ProfileMeasurement> measurements, Class<T> clazz, Optional<T> defaultValue) {
+  @Override
+  public void close() throws IOException {
+    if(hbaseClient != null) {
+      hbaseClient.close();
+    }
+  }
+
+  private <T> List<ProfileMeasurement> doFetch(List<ProfileMeasurement> measurements,
+                                               Class<T> clazz,
+                                               Optional<T> defaultValue) {
     List<ProfileMeasurement> values = new ArrayList<>();
 
-    // build the gets for HBase
+    // define which columns need fetched
     byte[] columnFamily = Bytes.toBytes(columnBuilder.getColumnFamily());
     byte[] columnQualifier = columnBuilder.getColumnQualifier("value");
-    List<Get> gets = new ArrayList<>();
+    HBaseProjectionCriteria.ColumnMetaData column = new HBaseProjectionCriteria.ColumnMetaData(columnFamily, columnQualifier);
+    HBaseProjectionCriteria criteria = new HBaseProjectionCriteria().addColumn(column);
+
     for(ProfileMeasurement measurement: measurements) {
       byte[] rowKey = rowKeyBuilder.rowKey(measurement);
-      Get get = new Get(rowKey).addColumn(columnFamily, columnQualifier);
-      gets.add(get);
+      hbaseClient.addGet(rowKey, criteria);
     }
 
     // query HBase
-    try {
-      Result[] results = table.get(gets);
-      for(int i = 0; i < results.length; ++i) {
-        Result result = results[i];
-        ProfileMeasurement measurement = measurements.get(i);
-
-        boolean exists = result.containsColumn(columnFamily, columnQualifier);
-        if(exists) {
-          // value found
-          byte[] value = result.getValue(columnFamily, columnQualifier);
-          measurement.withProfileValue(SerDeUtils.fromBytes(value, clazz));
-          values.add(measurement);
-
-        } else if(defaultValue.isPresent()) {
-          // no value found, use default value provided
-          measurement.withProfileValue(defaultValue.get());
-          values.add(measurement);
-
-        } else {
-          // no value found and no default provided. nothing to do
-        }
+    Result[] results = hbaseClient.getAll();
+    for(int i = 0; i < results.length; ++i) {
+      Result result = results[i];
+      ProfileMeasurement measurement = measurements.get(i);
+
+      boolean exists = result.containsColumn(columnFamily, columnQualifier);
+      if(exists) {
+        // value found
+        byte[] value = result.getValue(columnFamily, columnQualifier);
+        measurement.withProfileValue(SerDeUtils.fromBytes(value, clazz));
+        values.add(measurement);
+
+      } else if(defaultValue.isPresent()) {
+        // no value found, use default value provided
+        measurement.withProfileValue(defaultValue.get());
+        values.add(measurement);
+
+      } else {
+        // no value found and no default provided. nothing to do
       }
-    } catch(IOException e) {
-      throw new RuntimeException(e);
     }
 
     return values;
   }
 
+  protected HBaseClient getHbaseClient() {
+    return hbaseClient;
+  }
 
-  public void setTable(HTableInterface table) {
-    this.table = table;
+  protected RowKeyBuilder getRowKeyBuilder() {
+    return rowKeyBuilder;
   }
 
-  public void setRowKeyBuilder(RowKeyBuilder rowKeyBuilder) {
+  protected void setRowKeyBuilder(RowKeyBuilder rowKeyBuilder) {
     this.rowKeyBuilder = rowKeyBuilder;
   }
 
-  public void setColumnBuilder(ColumnBuilder columnBuilder) {
+  protected ColumnBuilder getColumnBuilder() {
+    return columnBuilder;
+  }
+
+  protected void setColumnBuilder(ColumnBuilder columnBuilder) {
     this.columnBuilder = columnBuilder;
   }
+
+  protected long getPeriodDurationMillis() {
+    return periodDurationMillis;
+  }
 }
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClientFactory.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClientFactory.java
new file mode 100644
index 0000000..9d83100
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/HBaseProfilerClientFactory.java
@@ -0,0 +1,101 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.HBaseClientFactory;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
+import org.apache.metron.hbase.client.HBaseTableClientFactory;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_CONNECTION_FACTORY;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
+import static org.apache.metron.profiler.client.stellar.Util.getPeriodDurationInMillis;
+
+/**
+ * Creates an {@link HBaseProfilerClient}.
+ */
+public class HBaseProfilerClientFactory implements ProfilerClientFactory {
+
+  /**
+   * The factory that provides the {@link HBaseClient} that is used to interact with HBase.
+   */
+  private HBaseClientFactory hBaseClientFactory;
+
+  public HBaseProfilerClientFactory() {
+    this(new HBaseTableClientFactory());
+  }
+
+  public HBaseProfilerClientFactory(HBaseClientFactory hBaseClientFactory) {
+    this.hBaseClientFactory = hBaseClientFactory;
+  }
+
+  @Override
+  public HBaseProfilerClient create(Map<String, Object> globals) {
+    // create the hbase client
+    String tableName = PROFILER_HBASE_TABLE.get(globals, String.class);
+    HBaseConnectionFactory connFactory = getConnectionFactory(globals);
+    Configuration config = HBaseConfiguration.create();
+    HBaseClient hbaseClient = hBaseClientFactory.create(connFactory, config, tableName);
+
+    // create the profiler client
+    RowKeyBuilder rowKeyBuilder = getRowKeyBuilder(globals);
+    ColumnBuilder columnBuilder = getColumnBuilder(globals);
+    long periodDuration = getPeriodDurationInMillis(globals);
+    return new HBaseProfilerClient(hbaseClient, rowKeyBuilder, columnBuilder, periodDuration);
+  }
+
+  /**
+   * Creates the ColumnBuilder to use in accessing the profile data.
+   * @param global The global configuration.
+   */
+  private static ColumnBuilder getColumnBuilder(Map<String, Object> global) {
+    String columnFamily = PROFILER_COLUMN_FAMILY.get(global, String.class);
+    return new ValueOnlyColumnBuilder(columnFamily);
+  }
+
+  /**
+   * Creates the ColumnBuilder to use in accessing the profile data.
+   * @param global The global configuration.
+   */
+  private static RowKeyBuilder getRowKeyBuilder(Map<String, Object> global) {
+    Integer saltDivisor = PROFILER_SALT_DIVISOR.get(global, Integer.class);
+    return new SaltyRowKeyBuilder(saltDivisor, getPeriodDurationInMillis(global), TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Create the {@link HBaseConnectionFactory} to use when accessing HBase.
+   * @param global The global configuration.
+   */
+  private static HBaseConnectionFactory getConnectionFactory(Map<String, Object> global) {
+    String clazzName = PROFILER_HBASE_CONNECTION_FACTORY.get(global, String.class);
+    return HBaseConnectionFactory.byName(clazzName);
+  }
+}
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
index 161575f..7283d21 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClient.java
@@ -23,13 +23,14 @@ package org.apache.metron.profiler.client;
 import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.metron.profiler.ProfilePeriod;
 
+import java.io.Closeable;
 import java.util.List;
 import java.util.Optional;
 
 /**
  * An interface for a client capable of retrieving the profile data that has been persisted by the Profiler.
  */
-public interface ProfilerClient {
+public interface ProfilerClient extends Closeable {
 
   /**
    * Fetch the values stored in a profile based on a start and end timestamp.
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClientFactories.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClientFactories.java
new file mode 100644
index 0000000..5fb4145
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClientFactories.java
@@ -0,0 +1,45 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.client;
+
+import java.util.Map;
+
+/**
+ * Enumerates the available {@link ProfilerClientFactory} implementations.
+ */
+public enum ProfilerClientFactories implements ProfilerClientFactory {
+
+  /**
+   * The default factory that returns a {@link ProfilerClient} that interacts
+   * with profiles stored in HBase.
+   */
+  DEFAULT(new HBaseProfilerClientFactory());
+
+  private ProfilerClientFactory factory;
+
+  ProfilerClientFactories(ProfilerClientFactory factory) {
+    this.factory = factory;
+  }
+
+  @Override
+  public ProfilerClient create(Map<String, Object> globals) {
+    return factory.create(globals);
+  }
+}
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClientFactory.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClientFactory.java
new file mode 100644
index 0000000..7f659a7
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/ProfilerClientFactory.java
@@ -0,0 +1,36 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.client;
+
+import java.util.Map;
+
+/**
+ * Responsible for creating a {@link ProfilerClient}.
+ */
+public interface ProfilerClientFactory {
+
+  /**
+   * Create a {@link ProfilerClient}.
+   *
+   * @param globals The global configuration.
+   * @return The {@link ProfilerClient}.
+   */
+  ProfilerClient create(Map<String, Object> globals);
+}
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
index a0e2bdf..5a45571 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/GetProfile.java
@@ -20,18 +20,12 @@
 
 package org.apache.metron.profiler.client.stellar;
 
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.metron.hbase.HTableProvider;
-import org.apache.metron.hbase.TableProvider;
 import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.metron.profiler.ProfilePeriod;
-import org.apache.metron.profiler.client.HBaseProfilerClient;
+import org.apache.metron.profiler.client.HBaseProfilerClientFactory;
 import org.apache.metron.profiler.client.ProfilerClient;
-import org.apache.metron.profiler.hbase.ColumnBuilder;
-import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
-import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.profiler.client.ProfilerClientFactories;
+import org.apache.metron.profiler.client.ProfilerClientFactory;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.ParseException;
 import org.apache.metron.stellar.dsl.Stellar;
@@ -42,21 +36,16 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.TimeUnit;
 
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_DEFAULT_VALUE;
 import static org.apache.metron.profiler.client.stellar.Util.getArg;
 import static org.apache.metron.profiler.client.stellar.Util.getEffectiveConfig;
-import static org.apache.metron.profiler.client.stellar.Util.getPeriodDurationInMillis;
+import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
 
 /**
  * A Stellar function that can retrieve data contained within a Profile.
@@ -91,105 +80,135 @@ import static org.apache.metron.profiler.client.stellar.Util.getPeriodDurationIn
         name="GET",
         description="Retrieves a series of values from a stored profile.",
         params={
-          "profile - The name of the profile.",
-          "entity - The name of the entity.",
-          "periods - The list of profile periods to fetch. Use PROFILE_WINDOW or PROFILE_FIXED.",
-          "groups - Optional - The groups to retrieve. Must correspond to the 'groupBy' " +
-                    "list used during profile creation. Defaults to an empty list, meaning no groups.",
-          "config_overrides - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter " +
-                  "of the same name. Default is the empty Map, meaning no overrides."
+                "profile - The name of the profile.",
+                "entity - The name of the entity.",
+                "periods - The list of profile periods to fetch. Use PROFILE_WINDOW or PROFILE_FIXED.",
+                "groups - Optional - The groups to retrieve. Must correspond to the 'groupBy' " +
+                        "list used during profile creation. Defaults to an empty list, meaning no groups.",
+                "config_overrides - Optional - Map (in curly braces) of name:value pairs, each overriding the global config parameter " +
+                        "of the same name. Default is the empty Map, meaning no overrides."
         },
         returns="The selected profile measurements."
 )
 public class GetProfile implements StellarFunction {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final int PROFILE_ARG_INDEX = 0;
+  private static final int ENTITY_ARG_INDEX = 1;
+  private static final int PERIOD_ARG_INDEX = 2;
+  private static final int GROUPS_ARG_INDEX = 3;
+  private static final int CONFIG_OVERRIDES_ARG_INDEX = 4;
 
   /**
-   * Cached client that can retrieve profile values.
+   * Allows the function to retrieve persisted {@link ProfileMeasurement} values.
    */
-  private ProfilerClient client;
+  private ProfilerClient profilerClient;
 
   /**
-   * Cached value of config map actually used to construct the previously cached client.
+   * Creates the {@link ProfilerClient} used by this function.
    */
-  private Map<String, Object> cachedConfigMap = new HashMap<String, Object>(6);
+  private ProfilerClientFactory profilerClientFactory;
 
-  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  /**
+   * Last known global configuration used to create the {@link ProfilerClient}. If the
+   * global configuration changes, a new {@link ProfilerClient} needs to be constructed.
+   */
+  private Map<String, Object> lastKnownGlobals = new HashMap<>();
 
   /**
-   * Initialization.  No longer need to do anything in initialization,
-   * as all setup is done lazily and cached.
+   * The default constructor used during Stellar function resolution.
    */
-  @Override
-  public void initialize(Context context) {
+  public GetProfile() {
+    this(ProfilerClientFactories.DEFAULT);
   }
 
   /**
-   * Is the function initialized?
+   * The constructor used for testing.
+   * @param profilerClientFactory
    */
+  public GetProfile(ProfilerClientFactory profilerClientFactory) {
+    this.profilerClientFactory = profilerClientFactory;
+  }
+
   @Override
-  public boolean isInitialized() {
-    return true;
+  public void initialize(Context context) {
+    Map<String, Object> globals = getGlobals(context);
+    profilerClient = profilerClientFactory.create(globals);
   }
 
-  /**
-   * Apply the function.
-   * @param args The function arguments.
-   * @param context
-   */
   @Override
-  public Object apply(List<Object> args, Context context) throws ParseException {
+  public boolean isInitialized() {
+    return profilerClient != null;
+  }
 
-    String profile = getArg(0, String.class, args);
-    String entity = getArg(1, String.class, args);
-    Optional<List<ProfilePeriod>> periods = Optional.ofNullable(getArg(2, List.class, args));
-    //Optional arguments
-    @SuppressWarnings("unchecked")
-    List<Object> groups = null;
-    Map configOverridesMap = null;
-    if (args.size() < 4) {
-      // no optional args, so default 'groups' and configOverridesMap remains null.
-      groups = new ArrayList<>(0);
-    }
-    else if (args.get(3) instanceof List) {
-      // correct extensible usage
-      groups = getArg(3, List.class, args);
-      if (args.size() >= 5) {
-        configOverridesMap = getArg(4, Map.class, args);
-        if (configOverridesMap.isEmpty()) configOverridesMap = null;
-      }
-    }
-    else {
-      // Deprecated "varargs" style usage for groups_list
-      // configOverridesMap cannot be specified so it remains null.
-      groups = getGroupsArg(3, args);
+  @Override
+  public void close() throws IOException {
+    if(profilerClient != null) {
+      profilerClient.close();
     }
+  }
 
-    Map<String, Object> effectiveConfig = getEffectiveConfig(context, configOverridesMap);
-    Object defaultValue = null;
-    //lazily create new profiler client if needed
-    if (client == null || !cachedConfigMap.equals(effectiveConfig)) {
-      RowKeyBuilder rowKeyBuilder = getRowKeyBuilder(effectiveConfig);
-      ColumnBuilder columnBuilder = getColumnBuilder(effectiveConfig);
-      HTableInterface table = getTable(effectiveConfig);
-      long periodDuration = getPeriodDurationInMillis(effectiveConfig);
-      client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder, periodDuration);
-      cachedConfigMap = effectiveConfig;
-    }
-    if(cachedConfigMap != null) {
-      defaultValue = ProfilerClientConfig.PROFILER_DEFAULT_VALUE.get(cachedConfigMap);
+  @Override
+  public Object apply(List<Object> args, Context context) throws ParseException {
+    // required arguments
+    String profile = getArg(PROFILE_ARG_INDEX, String.class, args);
+    String entity = getArg(ENTITY_ARG_INDEX, String.class, args);
+    List<ProfilePeriod> periods = getArg(PERIOD_ARG_INDEX, List.class, args);
+
+    // optional arguments
+    List<Object> groups = getGroups(args);
+    Map<String, Object> overrides = getOverrides(args);
+
+    // lazily create new profiler client if needed
+    Map<String, Object> effectiveConfig = getEffectiveConfig(context, overrides);
+    if (profilerClient == null || !lastKnownGlobals.equals(effectiveConfig)) {
+      profilerClient = profilerClientFactory.create(effectiveConfig);
+      lastKnownGlobals = effectiveConfig;
     }
 
-    List<ProfileMeasurement> measurements = client.fetch(Object.class, profile, entity, groups,
-            periods.orElse(new ArrayList<>(0)), Optional.ofNullable(defaultValue));
+    // is there a default value?
+    Optional<Object> defaultValue = Optional.empty();
+    if(effectiveConfig != null) {
+      defaultValue = Optional.ofNullable(PROFILER_DEFAULT_VALUE.get(effectiveConfig));
+    }
 
     // return only the value of each profile measurement
+    List<ProfileMeasurement> measurements = profilerClient.fetch(Object.class, profile, entity, groups, periods, defaultValue);
     List<Object> values = new ArrayList<>();
     for(ProfileMeasurement m: measurements) {
       values.add(m.getProfileValue());
     }
+
     return values;
   }
 
+  private Map<String, Object> getOverrides(List<Object> args) {
+    Map<String, Object> configOverridesMap = null;
+    if(args.size() > CONFIG_OVERRIDES_ARG_INDEX && args.get(GROUPS_ARG_INDEX) instanceof List) {
+      configOverridesMap = getArg(CONFIG_OVERRIDES_ARG_INDEX, Map.class, args);
+      if (configOverridesMap.isEmpty()) {
+        configOverridesMap = null;
+      }
+    }
+    return configOverridesMap;
+  }
+
+  private List<Object> getGroups(List<Object> args) {
+    List<Object> groups;
+    if (args.size() < CONFIG_OVERRIDES_ARG_INDEX) {
+      // no optional args, so default 'groups' and configOverridesMap remains null.
+      groups = new ArrayList<>(0);
+
+    } else if (args.get(GROUPS_ARG_INDEX) instanceof List) {
+      // correct extensible usage
+      groups = getArg(GROUPS_ARG_INDEX, List.class, args);
+
+    } else {
+      // deprecated "varargs" style usage for groups_list
+      groups = getVarArgGroups(GROUPS_ARG_INDEX, args);
+    }
+    return groups;
+  }
+
   /**
    * Get the groups defined by the user.
    *
@@ -201,7 +220,7 @@ public class GetProfile implements StellarFunction {
    * @param args The function arguments.
    * @return The groups.
    */
-  private List<Object> getGroupsArg(int startIndex, List<Object> args) {
+  private static List<Object> getVarArgGroups(int startIndex, List<Object> args) {
     List<Object> groups = new ArrayList<>();
 
     for(int i=startIndex; i<args.size(); i++) {
@@ -212,76 +231,8 @@ public class GetProfile implements StellarFunction {
     return groups;
   }
 
-  /**
-   * Creates the ColumnBuilder to use in accessing the profile data.
-   * @param global The global configuration.
-   */
-  private ColumnBuilder getColumnBuilder(Map<String, Object> global) {
-    ColumnBuilder columnBuilder;
-
-    String columnFamily = PROFILER_COLUMN_FAMILY.get(global, String.class);
-    columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
-
-    return columnBuilder;
-  }
-
-  /**
-   * Creates the ColumnBuilder to use in accessing the profile data.
-   * @param global The global configuration.
-   */
-  private RowKeyBuilder getRowKeyBuilder(Map<String, Object> global) {
-
-    // how long is the profile period?
-    long duration = PROFILER_PERIOD.get(global, Long.class);
-    LOG.debug("profiler client: {}={}", PROFILER_PERIOD, duration);
-
-    // which units are used to define the profile period?
-    String configuredUnits = PROFILER_PERIOD_UNITS.get(global, String.class);
-    TimeUnit units = TimeUnit.valueOf(configuredUnits);
-    LOG.debug("profiler client: {}={}", PROFILER_PERIOD_UNITS, units);
-
-    // what is the salt divisor?
-    Integer saltDivisor = PROFILER_SALT_DIVISOR.get(global, Integer.class);
-    LOG.debug("profiler client: {}={}", PROFILER_SALT_DIVISOR, saltDivisor);
-
-    return new SaltyRowKeyBuilder(saltDivisor, duration, units);
-  }
-
-  /**
-   * Create an HBase table used when accessing HBase.
-   * @param global The global configuration.
-   * @return
-   */
-  private HTableInterface getTable(Map<String, Object> global) {
-
-    String tableName = PROFILER_HBASE_TABLE.get(global, String.class);
-    TableProvider provider = getTableProvider(global);
-
-    try {
-      return provider.getTable(HBaseConfiguration.create(), tableName);
-
-    } catch (IOException e) {
-      throw new IllegalArgumentException(String.format("Unable to access table: %s", tableName), e);
-    }
-  }
-
-  /**
-   * Create the TableProvider to use when accessing HBase.
-   * @param global The global configuration.
-   */
-  private TableProvider getTableProvider(Map<String, Object> global) {
-    String clazzName = PROFILER_HBASE_TABLE_PROVIDER.get(global, String.class);
-
-    TableProvider provider;
-    try {
-      @SuppressWarnings("unchecked")
-      Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(clazzName);
-      provider = clazz.getConstructor().newInstance();
-
-    } catch (Exception e) {
-      provider = new HTableProvider();
-    }
-
-    return provider;
+  private static Map<String, Object> getGlobals(Context context) {
+    return (Map<String, Object>) context.getCapability(GLOBAL_CONFIG)
+            .orElse(Collections.emptyMap());
   }
 }
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
index 1715b23..08ee064 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/ProfilerClientConfig.java
@@ -20,8 +20,9 @@
 
 package org.apache.metron.profiler.client.stellar;
 
-import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.metron.hbase.HTableProvider;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
 
 import java.util.Map;
 
@@ -42,6 +43,11 @@ public enum ProfilerClientConfig {
   PROFILER_HBASE_TABLE_PROVIDER("hbase.provider.impl", HTableProvider.class.getName(), String.class),
 
   /**
+   * A global property that defines the name of the {@link HBaseConnectionFactory} implementation class.
+   */
+  PROFILER_HBASE_CONNECTION_FACTORY("hbase.connection.factory", HBaseConnectionFactory.class.getName(), String.class),
+
+  /**
    * A global property that defines the duration of each profile period.  This value
    * should be defined along with 'profiler.client.period.duration.units'.
    */
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
index ea85c56..304cbe3 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/Util.java
@@ -75,7 +75,7 @@ public class Util {
    * @return effective config Map with overrides applied.
    * @throws ParseException - if any override values are of wrong type.
    */
-  public static Map<String, Object> getEffectiveConfig(Context context , Map configOverridesMap ) throws ParseException {
+  public static Map<String, Object> getEffectiveConfig(Context context, Map configOverridesMap) throws ParseException {
     // ensure the required capabilities are defined
     final Context.Capabilities[] required = { GLOBAL_CONFIG };
     validateCapabilities(context, required);
diff --git a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/VerboseProfile.java b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/VerboseProfile.java
index 9e857aa..31106da 100644
--- a/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/VerboseProfile.java
+++ b/metron-analytics/metron-profiler-client/src/main/java/org/apache/metron/profiler/client/stellar/VerboseProfile.java
@@ -18,18 +18,12 @@
 
 package org.apache.metron.profiler.client.stellar;
 
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.metron.hbase.HTableProvider;
-import org.apache.metron.hbase.TableProvider;
 import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.metron.profiler.ProfilePeriod;
-import org.apache.metron.profiler.client.HBaseProfilerClient;
+import org.apache.metron.profiler.client.HBaseProfilerClientFactory;
 import org.apache.metron.profiler.client.ProfilerClient;
-import org.apache.metron.profiler.hbase.ColumnBuilder;
-import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
-import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.profiler.client.ProfilerClientFactories;
+import org.apache.metron.profiler.client.ProfilerClientFactory;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.ParseException;
 import org.apache.metron.stellar.dsl.Stellar;
@@ -45,15 +39,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.TimeUnit;
 
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
 import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_DEFAULT_VALUE;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
 import static org.apache.metron.profiler.client.stellar.Util.getArg;
-import static org.apache.metron.profiler.client.stellar.Util.getPeriodDurationInMillis;
 import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
 
 /**
@@ -88,69 +76,101 @@ import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
         returns="A map for each profile measurement containing the profile name, entity, period, and value."
 )
 public class VerboseProfile implements StellarFunction {
-
   protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  protected static String PROFILE_KEY = "profile";
-  protected static String ENTITY_KEY = "entity";
-  protected static String PERIOD_KEY = "period";
-  protected static String PERIOD_START_KEY = "period.start";
-  protected static String PERIOD_END_KEY = "period.end";
-  protected static String VALUE_KEY = "value";
-  protected static String GROUPS_KEY = "groups";
-  private ProfilerClient client;
+  protected static final String PROFILE_KEY = "profile";
+  protected static final String ENTITY_KEY = "entity";
+  protected static final String PERIOD_KEY = "period";
+  protected static final String PERIOD_START_KEY = "period.start";
+  protected static final String PERIOD_END_KEY = "period.end";
+  protected static final String VALUE_KEY = "value";
+  protected static final String GROUPS_KEY = "groups";
+  private static final int PROFILE_ARG_INDEX = 0;
+  private static final int ENTITY_ARG_INDEX = 1;
+  private static final int PERIOD_ARG_INDEX = 2;
+  private static final int GROUPS_ARG_INDEX = 3;
+
+  /**
+   * The default constructor used during Stellar function resolution.
+   */
+  public VerboseProfile() {
+    this(ProfilerClientFactories.DEFAULT);
+  }
+
+  /**
+   * The constructor used for testing.
+   * @param profilerClientFactory
+   */
+  public VerboseProfile(ProfilerClientFactory profilerClientFactory) {
+    this.profilerClientFactory = profilerClientFactory;
+  }
+
+  /**
+   * Allows the function to retrieve persisted {@link ProfileMeasurement} values.
+   */
+  private ProfilerClient profilerClient;
+
+  /**
+   * Creates the {@link ProfilerClient} used by this function.
+   */
+  private ProfilerClientFactory profilerClientFactory;
 
   @Override
   public void initialize(Context context) {
-    // nothing to do
+    // values stored in the global config that are used to initialize the ProfilerClient
+    // are read only once during initialization.  if those values change during a Stellar
+    // session, this function will not respond to them.  the Stellar session would need to be
+    // restarted for those changes to take effect.  this differs from the behavior of `PROFILE_GET`.
+    Map<String, Object> globals = getGlobals(context);
+    profilerClient = profilerClientFactory.create(globals);
   }
 
   @Override
   public boolean isInitialized() {
-    return true;
+    return profilerClient != null;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if(profilerClient != null) {
+      profilerClient.close();
+    }
   }
 
   @Override
   public Object apply(List<Object> args, Context context) throws ParseException {
     // required arguments
-    String profile = getArg(0, String.class, args);
-    String entity = getArg(1, String.class, args);
-    List<ProfilePeriod> periods = getArg(2, List.class, args);
+    String profile = getArg(PROFILE_ARG_INDEX, String.class, args);
+    String entity = getArg(ENTITY_ARG_INDEX, String.class, args);
+    List<ProfilePeriod> periods = getArg(PERIOD_ARG_INDEX, List.class, args);
 
     // optional 'groups' argument
     List<Object> groups = new ArrayList<>();
-    if(args.size() >= 4) {
-      groups = getArg(3, List.class, args);
-    }
-
-    // get globals from the context
-    Map<String, Object> globals = (Map<String, Object>) context.getCapability(GLOBAL_CONFIG)
-            .orElse(Collections.emptyMap());
-
-    // lazily create the profiler client, if needed
-    if (client == null) {
-      RowKeyBuilder rowKeyBuilder = getRowKeyBuilder(globals);
-      ColumnBuilder columnBuilder = getColumnBuilder(globals);
-      HTableInterface table = getTable(globals);
-      long periodDuration = getPeriodDurationInMillis(globals);
-      client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder, periodDuration);
+    if(args.size() > GROUPS_ARG_INDEX) {
+      groups = getArg(GROUPS_ARG_INDEX, List.class, args);
     }
 
     // is there a default value?
     Optional<Object> defaultValue = Optional.empty();
+    Map<String, Object> globals = getGlobals(context);
     if(globals != null) {
       defaultValue = Optional.ofNullable(PROFILER_DEFAULT_VALUE.get(globals));
     }
 
-    List<ProfileMeasurement> measurements = client.fetch(Object.class, profile, entity, groups, periods, defaultValue);
-
     // render a view of each profile measurement
+    List<ProfileMeasurement> measurements = profilerClient.fetch(Object.class, profile, entity, groups, periods, defaultValue);
     List<Object> results = new ArrayList<>();
     for(ProfileMeasurement measurement: measurements) {
       results.add(render(measurement));
     }
+
     return results;
   }
 
+  private static Map<String, Object> getGlobals(Context context) {
+    return (Map<String, Object>) context.getCapability(GLOBAL_CONFIG)
+            .orElse(Collections.emptyMap());
+  }
+
   /**
    * Renders a view of the profile measurement.
    * @param measurement The profile measurement to render.
@@ -166,57 +186,4 @@ public class VerboseProfile implements StellarFunction {
     view.put(GROUPS_KEY, measurement.getGroups());
     return view;
   }
-
-  /**
-   * Creates the ColumnBuilder to use in accessing the profile data.
-   * @param global The global configuration.
-   */
-  private ColumnBuilder getColumnBuilder(Map<String, Object> global) {
-    String columnFamily = PROFILER_COLUMN_FAMILY.get(global, String.class);
-    return new ValueOnlyColumnBuilder(columnFamily);
-  }
-
-  /**
-   * Creates the ColumnBuilder to use in accessing the profile data.
-   * @param global The global configuration.
-   */
-  private RowKeyBuilder getRowKeyBuilder(Map<String, Object> global) {
-    Integer saltDivisor = PROFILER_SALT_DIVISOR.get(global, Integer.class);
-    return new SaltyRowKeyBuilder(saltDivisor, getPeriodDurationInMillis(global), TimeUnit.MILLISECONDS);
-  }
-
-  /**
-   * Create an HBase table used when accessing HBase.
-   * @param global The global configuration.
-   * @return
-   */
-  private HTableInterface getTable(Map<String, Object> global) {
-    String tableName = PROFILER_HBASE_TABLE.get(global, String.class);
-    TableProvider provider = getTableProvider(global);
-    try {
-      return provider.getTable(HBaseConfiguration.create(), tableName);
-
-    } catch (IOException e) {
-      throw new IllegalArgumentException(String.format("Unable to access table: %s", tableName), e);
-    }
-  }
-
-  /**
-   * Create the TableProvider to use when accessing HBase.
-   * @param global The global configuration.
-   */
-  private TableProvider getTableProvider(Map<String, Object> global) {
-    String clazzName = PROFILER_HBASE_TABLE_PROVIDER.get(global, String.class);
-    TableProvider provider;
-    try {
-      @SuppressWarnings("unchecked")
-      Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(clazzName);
-      provider = clazz.getConstructor().newInstance();
-
-    } catch (Exception e) {
-      provider = new HTableProvider();
-    }
-
-    return provider;
-  }
 }
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientFactoryTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientFactoryTest.java
new file mode 100644
index 0000000..8465256
--- /dev/null
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientFactoryTest.java
@@ -0,0 +1,108 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+package org.apache.metron.profiler.client;
+
+import org.apache.metron.hbase.client.FakeHBaseConnectionFactory;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_CONNECTION_FACTORY;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link HBaseProfilerClientFactory}.
+ */
+public class HBaseProfilerClientFactoryTest {
+
+  private static final String tableName = "table";
+  private static final String columnFamily = "columnFamily";
+  private static final Integer periodDuration = 23;
+  private static final TimeUnit periodDurationUnits = TimeUnit.MINUTES;
+  private static final Integer saltDivisor = 1000;
+  private static final long periodDurationMillis = periodDurationUnits.toMillis(23);
+  private HBaseProfilerClientFactory factory;
+
+  @Before
+  public void setup() {
+    factory = new HBaseProfilerClientFactory();
+  }
+
+  @Test
+  public void testCreate() {
+    Map<String, Object> globals = new HashMap<String, Object>() {{
+      put(PROFILER_HBASE_TABLE.getKey(), tableName);
+      put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
+      put(PROFILER_SALT_DIVISOR.getKey(), saltDivisor.toString());
+      put(PROFILER_HBASE_CONNECTION_FACTORY.getKey(), FakeHBaseConnectionFactory.class.getName());
+      put(PROFILER_PERIOD.getKey(), periodDuration.toString());
+      put(PROFILER_PERIOD_UNITS.getKey(), periodDurationUnits.toString());
+    }};
+
+    HBaseProfilerClient client = factory.create(globals);
+    assertEquals(periodDurationMillis, client.getPeriodDurationMillis());
+
+    // validate the row key builder that is created
+    SaltyRowKeyBuilder rowKeyBuilder = (SaltyRowKeyBuilder) client.getRowKeyBuilder();
+    assertEquals(saltDivisor, (Integer) rowKeyBuilder.getSaltDivisor());
+    assertEquals(periodDurationMillis, rowKeyBuilder.getPeriodDurationMillis());
+
+    // validate the column builder that is created
+    ValueOnlyColumnBuilder columnBuilder = (ValueOnlyColumnBuilder) client.getColumnBuilder();
+    assertEquals(columnFamily, columnBuilder.getColumnFamily());
+  }
+
+  @Test
+  public void testCreateUsingDefaultValues() {
+    Map<String, Object> globals = new HashMap<String, Object>() {{
+      // without using a mock connection factory, the test will hang trying to connect to HBase
+      put(PROFILER_HBASE_CONNECTION_FACTORY.getKey(), FakeHBaseConnectionFactory.class.getName());
+    }};
+
+    // find what the default values should be
+    final long defaultPeriodDuration = (Long) PROFILER_PERIOD.getDefault();
+    final TimeUnit defaultPeriodDurationUnits = TimeUnit.valueOf((String) PROFILER_PERIOD_UNITS.getDefault());
+    final long defaultPeriodDurationMillis = defaultPeriodDurationUnits.toMillis(defaultPeriodDuration);
+    final long defaultSaltDivisor = (Long) PROFILER_SALT_DIVISOR.getDefault();
+    final String defaultColumnFamily = (String) PROFILER_COLUMN_FAMILY.getDefault();
+
+    HBaseProfilerClient client = factory.create(globals);
+    assertEquals(defaultPeriodDurationMillis, client.getPeriodDurationMillis());
+
+    // validate the row key builder that is created
+    SaltyRowKeyBuilder rowKeyBuilder = (SaltyRowKeyBuilder) client.getRowKeyBuilder();
+    assertEquals(defaultSaltDivisor, rowKeyBuilder.getSaltDivisor());
+    assertEquals(defaultPeriodDurationMillis, rowKeyBuilder.getPeriodDurationMillis());
+
+    // validate the column builder that is created
+    ValueOnlyColumnBuilder columnBuilder = (ValueOnlyColumnBuilder) client.getColumnBuilder();
+    assertEquals(defaultColumnFamily, columnBuilder.getColumnFamily());
+  }
+}
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
index cc3748e..f2dce99 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/HBaseProfilerClientTest.java
@@ -20,184 +20,187 @@
 
 package org.apache.metron.profiler.client;
 
-import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.common.utils.SerDeUtils;
+import org.apache.metron.hbase.client.HBaseTableClient;
+import org.apache.metron.hbase.client.HBaseClient;
 import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.ProfilePeriod;
 import org.apache.metron.profiler.hbase.ColumnBuilder;
 import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
-import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
-import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
-import org.apache.metron.stellar.common.StellarStatefulExecutor;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
- * Tests the HBaseProfilerClient.
- *
- * The naming used in this test attempts to be as similar to how the 'groupBy'
- * functionality might be used 'in the wild'.  This test involves reading and
- * writing two separate groups originating from the same Profile and Entity.
- * There is a 'weekdays' group which contains all measurements taken on weekdays.
- * There is also a 'weekend' group which contains all measurements taken on weekends.
+ * Tests the {@link HBaseProfilerClient}.
  */
 public class HBaseProfilerClientTest {
-
   private static final String tableName = "profiler";
   private static final String columnFamily = "P";
+  private static final byte[] columnFamilyB = Bytes.toBytes(columnFamily);
+  private static final byte[] columnQualifier = Bytes.toBytes("column");
   private static final long periodDuration = 15;
   private static final TimeUnit periodUnits = TimeUnit.MINUTES;
   private static final int periodsPerHour = 4;
-
-  private HBaseProfilerClient client;
-  private StellarStatefulExecutor executor;
-  private MockHTable table;
-  private ProfileWriter profileWriter;
+  private static final byte[] expectedRowKey = Bytes.toBytes("some-row-key");
+  private static final String profileName = "profile1";
+  private static final String entityName = "entity1";
+  private static final int profileValue = 1231121;
+  private static final byte[] profileValueB = SerDeUtils.toBytes(profileValue);
+  private long periodDurationMillis = periodUnits.toMillis(periodDuration);
+
+  private HBaseClient hbaseClient;
+  private HBaseProfilerClient profilerClient;
+  private ProfileMeasurement expected;
+  private RowKeyBuilder rowKeyBuilder;
+  private ColumnBuilder columnBuilder;
+  private Result expectedResult;
+  private Result emptyResult;
 
   @Before
-  public void setup() throws Exception {
-    table = new MockHTable(tableName, columnFamily);
-    executor = new DefaultStellarStatefulExecutor();
-
-    // writes values to be read during testing
-    long periodDurationMillis = periodUnits.toMillis(periodDuration);
-    RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
-    ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
-    profileWriter = new ProfileWriter(rowKeyBuilder, columnBuilder, table, periodDurationMillis);
-
-    client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder, periodDurationMillis);
+  public void setup() {
+    // create a profile measurement used in the tests
+    expected = new ProfileMeasurement()
+            .withProfileName(profileName)
+            .withEntity(entityName)
+            .withPeriod(System.currentTimeMillis(), 5, TimeUnit.MINUTES)
+            .withProfileValue(profileValue);
+
+    // mock row key builder needs to return a row key for the profile measurement used in the tests
+    rowKeyBuilder = mock(RowKeyBuilder.class);
+    when(rowKeyBuilder.rowKey(any())).thenReturn(expectedRowKey);
+
+    // mock column builder - column family/qualifier comes from the column builder
+    columnBuilder = mock(ColumnBuilder.class);
+    when(columnBuilder.getColumnFamily()).thenReturn(columnFamily);
+    when(columnBuilder.getColumnQualifier(eq("value"))).thenReturn(columnQualifier);
+
+    // this mock is used to feed data to the profiler client while testing
+    hbaseClient = mock(HBaseTableClient.class);
+
+    // a result that matches the expected profile measurement that can be return by the mock hbase client
+    expectedResult = mock(Result.class);
+    when(expectedResult.containsColumn(eq(columnFamilyB), eq(columnQualifier))).thenReturn(true);
+    when(expectedResult.getValue(eq(columnFamilyB), eq(columnQualifier))).thenReturn(profileValueB);
+
+    // an empty result to use in the tests
+    emptyResult = mock(Result.class);
+    when(emptyResult.containsColumn(any(), any())).thenReturn(false);
+
+    // create the profiler client that will be tested
+    profilerClient = new HBaseProfilerClient(hbaseClient, rowKeyBuilder, columnBuilder, periodDurationMillis);
   }
 
-  @After
-  public void tearDown() throws Exception {
-    table.clear();
+  @Test
+  public void shouldFetchProfileMeasurement() {
+    // need the hbase client to return a Result matching the expected profile measurement value
+    Result[] results = new Result[] { expectedResult };
+    when(hbaseClient.getAll()).thenReturn(results);
+
+    List<ProfileMeasurement> measurements = profilerClient.fetch(
+            Object.class,
+            expected.getProfileName(),
+            expected.getEntity(),
+            expected.getGroups(),
+            Arrays.asList(expected.getPeriod()),
+            Optional.empty());
+    assertEquals(1, measurements.size());
+    assertEquals(expected, measurements.get(0));
   }
 
   @Test
-  public void Should_ReturnMeasurements_When_DataExistsForAGroup() throws Exception {
-    final String profile = "profile1";
-    final String entity = "entity1";
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final int count = hours * periodsPerHour + 1;
-    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
-
-    // setup - write two groups of measurements - 'weekends' and 'weekdays'
-    ProfileMeasurement prototype = new ProfileMeasurement()
-            .withProfileName(profile)
-            .withEntity(entity)
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(prototype, count, Arrays.asList("weekdays"), val -> expectedValue);
-    profileWriter.write(prototype, count, Arrays.asList("weekends"), val -> 0);
-
-    long end = System.currentTimeMillis();
-    long start = end - TimeUnit.HOURS.toMillis(2);
-    {
-      //validate "weekday" results
-      List<Object> groups = Arrays.asList("weekdays");
-      List<ProfileMeasurement> results = client.fetch(Integer.class, profile, entity, groups, start, end, Optional.empty());
-      assertEquals(count, results.size());
-      results.forEach(actual -> {
-        assertEquals(profile, actual.getProfileName());
-        assertEquals(entity, actual.getEntity());
-        assertEquals(groups, actual.getGroups());
-        assertEquals(expectedValue, actual.getProfileValue());
-      });
-    }
-    {
-      //validate "weekend" results
-      List<Object> groups = Arrays.asList("weekends");
-      List<ProfileMeasurement> results = client.fetch(Integer.class, profile, entity, groups, start, end, Optional.empty());
-      assertEquals(count, results.size());
-      results.forEach(actual -> {
-        assertEquals(profile, actual.getProfileName());
-        assertEquals(entity, actual.getEntity());
-        assertEquals(groups, actual.getGroups());
-        assertEquals(0, actual.getProfileValue());
-      });
-    }
+  public void shouldFetchNothingWhenNothingThere() {
+    // the hbase client will indicate their are no hits
+    Result[] results = new Result[] { emptyResult };
+    when(hbaseClient.getAll()).thenReturn(results);
+
+    List<ProfileMeasurement> measurements = profilerClient.fetch(
+            Object.class,
+            expected.getProfileName(),
+            expected.getEntity(),
+            expected.getGroups(),
+            Arrays.asList(expected.getPeriod()),
+            Optional.empty());
+    assertEquals(0, measurements.size());
   }
 
   @Test
-  public void Should_ReturnResultFromGroup_When_MultipleGroupsExist() throws Exception {
-    final String profile = "profile1";
-    final String entity = "entity1";
-    final int periodsPerHour = 4;
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final int count = hours * periodsPerHour;
-    final long endTime = System.currentTimeMillis();
-    final long startTime = endTime - TimeUnit.HOURS.toMillis(hours);
-
-    // setup - write two groups of measurements - 'weekends' and 'weekdays'
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue);
-    profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
-
-    List<Object> weekdays = Arrays.asList("weekdays");
-    List<ProfileMeasurement> results = client.fetch(Integer.class, profile, entity, weekdays, startTime, endTime, Optional.empty());
-
-    // should only return results from 'weekdays' group
-    assertEquals(count, results.size());
-    results.forEach(actual -> assertEquals(weekdays, actual.getGroups()));
+  public void shouldFetchDefaultValueWhenNothingThere() {
+    // the hbase client will indicate their are no hits
+    Result[] results = new Result[] { emptyResult };
+    when(hbaseClient.getAll()).thenReturn(results);
+
+    List<ProfileMeasurement> measurements = profilerClient.fetch(
+            Object.class,
+            expected.getProfileName(),
+            expected.getEntity(),
+            expected.getGroups(),
+            Arrays.asList(expected.getPeriod()),
+            Optional.of(profileValue));
+
+    // expect the default value to be returned
+    assertEquals(1, measurements.size());
+    assertEquals(expected, measurements.get(0));
   }
 
   @Test
-  public void Should_ReturnNoResults_When_GroupDoesNotExist() {
-    final String profile = "profile1";
-    final String entity = "entity1";
-    final int periodsPerHour = 4;
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final int count = hours * periodsPerHour;
-    final long endTime = System.currentTimeMillis();
-    final long startTime = endTime - TimeUnit.HOURS.toMillis(hours);
-
-    // create two groups of measurements - one on weekdays and one on weekends
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, count, Arrays.asList("weekdays"), val -> expectedValue);
-    profileWriter.write(m, count, Arrays.asList("weekends"), val -> 0);
-
-    // should return no results when the group does not exist
-    List<Object> groups = Arrays.asList("does-not-exist");
-    List<ProfileMeasurement> results = client.fetch(Integer.class, profile, entity, groups, startTime, endTime, Optional.empty());
-    assertEquals(0, results.size());
+  public void shouldFetchMultipleProfilePeriods() {
+    // need the hbase client to return a Result matching the expected profile measurement value
+    Result[] results = new Result[] { expectedResult, expectedResult, expectedResult, expectedResult };
+    when(hbaseClient.getAll()).thenReturn(results);
+
+    // fetching across multiple periods
+    ProfilePeriod start = ProfilePeriod.fromPeriodId(1L, 15L, TimeUnit.MINUTES);
+    List<ProfilePeriod> periods = new ArrayList<ProfilePeriod>() {{
+      add(start);
+      add(start.next());
+      add(start.next());
+      add(start.next());
+    }};
+
+    List<ProfileMeasurement> measurements = profilerClient.fetch(
+            Object.class,
+            expected.getProfileName(),
+            expected.getEntity(),
+            expected.getGroups(),
+            periods,
+            Optional.empty());
+
+    // the row key builder should be called once for each profile period
+    ArgumentCaptor<ProfileMeasurement> captor = new ArgumentCaptor<>();
+    verify(rowKeyBuilder, times(4)).rowKey(captor.capture());
+
+    // the profile periods should match those originally submited
+    List<ProfileMeasurement> submitted = captor.getAllValues();
+    assertEquals(periods.get(0), submitted.get(0).getPeriod());
+    assertEquals(periods.get(1), submitted.get(1).getPeriod());
+    assertEquals(periods.get(2), submitted.get(2).getPeriod());
+    assertEquals(periods.get(3), submitted.get(3).getPeriod());
+
+    assertEquals(4, measurements.size());
   }
 
   @Test
-  public void Should_ReturnNoResults_When_NoDataInStartToEnd() throws Exception {
-    final String profile = "profile1";
-    final String entity = "entity1";
-    final int hours = 2;
-    int numberToWrite = hours * periodsPerHour;
-    final List<Object> group = Arrays.asList("weekends");
-    final long measurementTime = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
-
-    // write some data with a timestamp of s1 day ago
-    ProfileMeasurement prototype = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(measurementTime, periodDuration, periodUnits);
-    profileWriter.write(prototype, numberToWrite, group, val -> 1000);
-
-    // should return no results when [start,end] is long after when test data was written
-    final long endFetchAt = System.currentTimeMillis();
-    final long startFetchAt = endFetchAt - TimeUnit.MILLISECONDS.toMillis(30);
-    List<ProfileMeasurement> results = client.fetch(Integer.class, profile, entity, group, startFetchAt, endFetchAt, Optional.empty());
-    assertEquals(0, results.size());
+  public void shouldCloseHBaseClient() throws IOException {
+    profilerClient.close();
+    verify(hbaseClient, times(1)).close();
   }
 }
\ No newline at end of file
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
deleted file mode 100644
index d4e5e66..0000000
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/ProfileWriter.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- *
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.metron.profiler.client;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.metron.hbase.HTableProvider;
-import org.apache.metron.hbase.ColumnList;
-import org.apache.metron.hbase.client.LegacyHBaseClient;
-import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.ProfilePeriod;
-import org.apache.metron.profiler.hbase.ColumnBuilder;
-import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
-import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
-/**
- * Writes ProfileMeasurement values that can be read during automated testing.
- */
-public class ProfileWriter {
-
-  private RowKeyBuilder rowKeyBuilder;
-  private ColumnBuilder columnBuilder;
-  private LegacyHBaseClient hbaseClient;
-  private HBaseProfilerClient client;
-
-  public ProfileWriter(RowKeyBuilder rowKeyBuilder, ColumnBuilder columnBuilder, HTableInterface table, long periodDurationMillis) {
-    this.rowKeyBuilder = rowKeyBuilder;
-    this.columnBuilder = columnBuilder;
-    this.hbaseClient = new LegacyHBaseClient((c, t) -> table, table.getConfiguration(), table.getName().getNameAsString());
-    this.client = new HBaseProfilerClient(table, rowKeyBuilder, columnBuilder, periodDurationMillis);
-  }
-
-  /**
-   * Writes profile measurements that can be used for testing.
-   *
-   * @param prototype      A prototype for the types of ProfileMeasurements that should be written.
-   * @param count          The number of profile measurements to write.
-   * @param group          The name of the group.
-   * @param valueGenerator A function that consumes the previous ProfileMeasurement value and produces the next.
-   */
-  public void write(ProfileMeasurement prototype, int count, List<Object> group, Function<Object, Object> valueGenerator) {
-
-    ProfileMeasurement m = prototype;
-    ProfilePeriod period = m.getPeriod();
-    for(int i=0; i<count; i++) {
-      // generate the next value that should be written
-      Object nextValue = valueGenerator.apply(m.getProfileValue());
-
-      // write the measurement
-      m = new ProfileMeasurement()
-              .withProfileName(prototype.getProfileName())
-              .withEntity(prototype.getEntity())
-              .withPeriod(period)
-              .withGroups(group)
-              .withProfileValue(nextValue);
-      write(m);
-
-      // advance to the next period
-      period = m.getPeriod().next();
-    }
-  }
-
-  /**
-   * Write a ProfileMeasurement.
-   * @param m The ProfileMeasurement to write.
-   */
-  private void write(ProfileMeasurement m) {
-
-    byte[] rowKey = rowKeyBuilder.rowKey(m);
-    ColumnList cols = columnBuilder.columns(m);
-
-    hbaseClient.addMutation(rowKey, cols, Durability.SKIP_WAL);
-    hbaseClient.mutate();
-  }
-
-  public static void main(String[] args) throws Exception {
-    RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
-    ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder();
-
-    Configuration config = HBaseConfiguration.create();
-    config.set("hbase.master.hostname", "node1");
-    config.set("hbase.regionserver.hostname", "node1");
-    config.set("hbase.zookeeper.quorum", "node1");
-
-    HTableProvider provider = new HTableProvider();
-    HTableInterface table = provider.getTable(config, "profiler");
-
-    long periodDurationMillis = TimeUnit.MINUTES.toMillis(15);
-    long when = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2);
-    ProfileMeasurement measure = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("192.168.66.121")
-            .withPeriod(when, periodDurationMillis, TimeUnit.MILLISECONDS);
-
-    ProfileWriter writer = new ProfileWriter(rowKeyBuilder, columnBuilder, table, periodDurationMillis);
-    writer.write(measure, 2 * 24 * 4, Collections.emptyList(), val -> new Random().nextInt(10));
-  }
-}
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
index 3fbed1c..41afa04 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/GetProfileTest.java
@@ -20,437 +20,191 @@
 
 package org.apache.metron.profiler.client.stellar;
 
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.client.ProfileWriter;
-import org.apache.metron.profiler.hbase.ColumnBuilder;
-import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
-import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.profiler.client.ProfilerClient;
 import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
 import org.apache.metron.stellar.common.StellarStatefulExecutor;
 import org.apache.metron.stellar.dsl.Context;
-import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
 import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
-import org.apache.metron.stellar.dsl.functions.resolver.SingletonFunctionResolver;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
- * Tests the GetProfile class.
+ * Tests the 'PROFILE_GET' function in the {@link GetProfile} class.
  */
 public class GetProfileTest {
 
-  private static final long periodDuration = 15;
-  private static final TimeUnit periodUnits = TimeUnit.MINUTES;
-  private static final int saltDivisor = 1000;
-  private static final String tableName = "profiler";
-  private static final String columnFamily = "P";
   private StellarStatefulExecutor executor;
-  private Map<String, Object> state;
-  private ProfileWriter profileWriter;
-  // different values of period and salt divisor, used to test config_overrides feature
-  private static final long periodDuration2 = 1;
-  private static final TimeUnit periodUnits2 = TimeUnit.HOURS;
-  private static final int saltDivisor2 = 2050;
-
-  private <T> T run(String expression, Class<T> clazz) {
-    return executor.execute(expression, state, clazz);
+  private FunctionResolver functionResolver;
+  private Map<String, Object> globals;
+  private GetProfile function;
+  private ProfilerClient profilerClient;
+  private List<Integer> results;
+  private ProfileMeasurement expected;
+  private ProfileMeasurement defaultMeasurement;
+  private Object defaultValue;
+
+  private List run(String expression) {
+    return executor.execute(expression, new HashMap<>(), List.class);
   }
 
-  /**
-   * This method sets up the configuration context for both writing profile data
-   * (using profileWriter to mock the complex process of what the Profiler topology
-   * actually does), and then reading that profile data (thereby testing the PROFILE_GET
-   * Stellar client implemented in GetProfile).
-   *
-   * It runs at @Before time, and sets testclass global variables used by the writers and readers.
-   * The various writers and readers are in each test case, not here.
-   *
-   * @return void
-   */
   @Before
   public void setup() {
-    state = new HashMap<>();
-    final HTableInterface table = MockHBaseTableProvider.addToCache(tableName, columnFamily);
-
-    // used to write values to be read during testing
-    long periodDurationMillis = TimeUnit.MINUTES.toMillis(15);
-    RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
-    ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
-    profileWriter = new ProfileWriter(rowKeyBuilder, columnBuilder, table, periodDurationMillis);
-
-    // global properties
-    Map<String, Object> global = new HashMap<String, Object>() {{
-      put(PROFILER_HBASE_TABLE.getKey(), tableName);
-      put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
-      put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
-      put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration));
-      put(PROFILER_PERIOD_UNITS.getKey(), periodUnits.toString());
-      put(PROFILER_SALT_DIVISOR.getKey(), Integer.toString(saltDivisor));
-    }};
-
-    // create the stellar execution environment
-    executor = new DefaultStellarStatefulExecutor(
-            new SimpleFunctionResolver()
-                    .withClass(GetProfile.class)
-                    .withClass(FixedLookback.class),
-            new Context.Builder()
-                    .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
-                    .build());
-  }
-
-  /**
-   * This method is similar to setup(), in that it sets up profiler configuration context,
-   * but only for the client.  Additionally, it uses periodDuration2, periodUnits2
-   * and saltDivisor2, instead of periodDuration, periodUnits and saltDivisor respectively.
-   *
-   * This is used in the unit tests that test the config_overrides feature of PROFILE_GET.
-   * In these tests, the context from @Before setup() is used to write the data, then the global
-   * context is changed to context2 (from this method).  Each test validates that a default read
-   * using global context2 then gets no valid results (as expected), and that a read using
-   * original context values in the PROFILE_GET config_overrides argument gets all expected results.
-   *
-   * @return context2 - The profiler client configuration context created by this method.
-   *    The context2 values are also set in the configuration of the StellarStatefulExecutor
-   *    stored in the global variable 'executor'.  However, there is no API for querying the
-   *    context values from a StellarStatefulExecutor, so we output the context2 Context object itself,
-   *    for validation purposes (so that its values can be validated as being significantly
-   *    different from the setup() settings).
-   */
-  private Context setup2() {
-    state = new HashMap<>();
+    // the mock profiler client used to feed profile measurement values to the function
+    profilerClient = mock(ProfilerClient.class);
 
     // global properties
-    Map<String, Object> global = new HashMap<String, Object>() {{
-      put(PROFILER_HBASE_TABLE.getKey(), tableName);
-      put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
-      put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
-      put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration2));
-      put(PROFILER_PERIOD_UNITS.getKey(), periodUnits2.toString());
-      put(PROFILER_SALT_DIVISOR.getKey(), Integer.toString(saltDivisor2));
-    }};
-
-    // create the modified context
-    Context context2 = new Context.Builder()
-            .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
+    globals = new HashMap<>();
+    Context context = new Context.Builder()
+            .with(Context.Capabilities.GLOBAL_CONFIG, () -> globals)
             .build();
 
-    // create the stellar execution environment
-    executor = new DefaultStellarStatefulExecutor(
-            new SimpleFunctionResolver()
-                    .withClass(GetProfile.class)
-                    .withClass(FixedLookback.class),
-            context2);
-
-    return context2; //because there is no executor.getContext() method
-  }
+    // the PROFILE_GET function that will be tested
+    function = new GetProfile(globals -> profilerClient);
+    function.initialize(context);
 
-  /**
-   * Values should be retrievable that have NOT been stored within a group.
-   */
-  @Test
-  public void testWithNoGroups() {
-    final int periodsPerHour = 4;
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Collections.emptyList();
+    // create the stellar execution environment
+    functionResolver = new SimpleFunctionResolver()
+            .withClass(FixedLookback.class)
+            .withInstance(function);
+    executor = new DefaultStellarStatefulExecutor(functionResolver, context);
 
-    // setup - write some measurements to be read later
-    final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement()
+    // create a profile measurement used in the tests
+    expected = new ProfileMeasurement()
             .withProfileName("profile1")
             .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, count, group, val -> expectedValue);
-
-    // execute - read the profile values - no groups
-    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))";
-    @SuppressWarnings("unchecked")
-    List<Integer> result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-    result.forEach(actual -> Assert.assertEquals(expectedValue, actual.intValue()));
-  }
-
-  /**
-   * Values should be retrievable that have been stored within a 'group'.
-   */
-  @Test
-  public void testWithOneGroup() {
-    final int periodsPerHour = 4;
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Arrays.asList("weekends");
+            .withPeriod(System.currentTimeMillis(), 5, TimeUnit.MINUTES)
+            .withProfileValue(1231121);
 
-    // setup - write some measurements to be read later
-    final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement()
+    defaultValue = 7777777;
+    defaultMeasurement = new ProfileMeasurement()
             .withProfileName("profile1")
             .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, count, group, val -> expectedValue);
-
-    // create a variable that contains the groups to use
-    state.put("groups", group);
-
-    // execute - read the profile values
-    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['weekends'])";
-    @SuppressWarnings("unchecked")
-    List<Integer> result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-
-    // test the deprecated but allowed "varargs" form of groups specification
-    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), 'weekends')";
-    result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-    result.forEach(actual -> Assert.assertEquals(expectedValue, actual.intValue()));
+            .withPeriod(System.currentTimeMillis(), 5, TimeUnit.MINUTES)
+            .withProfileValue(defaultValue);
   }
 
-  /**
-   * Values should be retrievable that have been stored within a 'group'.
-   */
   @Test
-  public void testWithTwoGroups() {
-    final int periodsPerHour = 4;
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Arrays.asList("weekdays", "tuesday");
-
-    // setup - write some measurements to be read later
-    final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, count, group, val -> expectedValue);
-
-    // create a variable that contains the groups to use
-    state.put("groups", group);
-
-    // execute - read the profile values
-    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['weekdays', 'tuesday'])";
-    @SuppressWarnings("unchecked")
-    List<Integer> result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-
-    // test the deprecated but allowed "varargs" form of groups specification
-    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), 'weekdays', 'tuesday')";
-    result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-    result.forEach(actual -> Assert.assertEquals(expectedValue, actual.intValue()));
+  public void testGetProfileValue() {
+    // only one profile measurement exists
+    when(profilerClient.fetch(
+            eq(Object.class),
+            eq(expected.getProfileName()),
+            eq(expected.getEntity()),
+            eq(expected.getGroups()),
+            any(),
+            any())).thenReturn(Arrays.asList(expected));
+
+    // expect the one measurement to be returned
+    results = run("PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))");
+    assertEquals(1, results.size());
+    assertEquals(expected.getProfileValue(), results.get(0));
   }
 
-  /**
-   * Initialization should fail if the required context values are missing.
-   */
-  @Test(expected = ParseException.class)
-  public void testMissingContext() {
-    Context empty = Context.EMPTY_CONTEXT();
-
-    // 'unset' the context that was created during setup()
-    executor.setContext(empty);
-
-    // force re-initialization with no context
-    SingletonFunctionResolver.getInstance().initialize(empty);
-
-    // validate - function should be unable to initialize
-    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(1000, 'SECONDS'), groups)";
-    run(expr, List.class);
-  }
-
-  /**
-   * If the time horizon specified does not include any profile measurements, then
-   * none should be returned.
-   */
   @Test
-  public void testOutsideTimeHorizon() {
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Collections.emptyList();
-
-    // setup - write a single value from 2 hours ago
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, 1, group, val -> expectedValue);
-
-    // create a variable that contains the groups to use
-    state.put("groups", group);
-
-    // execute - read the profile values
-    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'SECONDS'))";
-    @SuppressWarnings("unchecked")
-    List<Integer> result = run(expr, List.class);
-
-    // validate - there should be no values from only 4 seconds ago
-    Assert.assertEquals(0, result.size());
+  public void testGetProfileValueWithGroup() {
+    // the profile measurement is part of a group
+    expected.withGroups(Arrays.asList("group1"));
+
+    // only one profile measurement exists
+    when(profilerClient.fetch(
+            eq(Object.class),
+            eq(expected.getProfileName()),
+            eq(expected.getEntity()),
+            eq(expected.getGroups()),
+            any(),
+            any())).thenReturn(Arrays.asList(expected));
+
+    // expect the one measurement to be returned
+    results = run("PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['group1'])");
+    assertEquals(1, results.size());
+    assertEquals(expected.getProfileValue(), results.get(0));
   }
 
-  /**
-   * Default value should be able to be specified
-   */
   @Test
-  public void testWithDefaultValue() {
-    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))";
-    @SuppressWarnings("unchecked")
-    List<Integer> result = run(expr, List.class);
-
-    // validate - expect to fail to read any values because we didn't write any.
-    Assert.assertEquals(0, result.size());
-
-    // execute - read the profile values - with config_override.
-    // first two override values are strings, third is deliberately a number.
-    testOverride("{'profiler.default.value' : 0}", 0);
-    testOverride("{'profiler.default.value' : 'metron'}", "metron");
-    testOverride("{'profiler.default.value' : []}", new ArrayList<>());
+  public void testGetProfileValueWithDifferentGroup() {
+    // the profile measurement is part of a group
+    expected.withGroups(Arrays.asList("group1"));
+
+    // only one profile measurement exists
+    when(profilerClient.fetch(
+            eq(Object.class),
+            eq(expected.getProfileName()),
+            eq(expected.getEntity()),
+            eq(expected.getGroups()),
+            any(),
+            any())).thenReturn(Arrays.asList(expected));
+
+    // expect the one measurement to be returned
+    results = run("PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['group999'])");
+    assertEquals(0, results.size());
   }
 
-  private void testOverride(String overrides, Object defaultVal) {
-      String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), [], " + overrides + ")";
-      List<Object> result = run(expr, List.class);
-
-      // validate - expect to read all values from the past 4 hours (16 or 17 values depending on start time)
-      // but they should all be the default value.
-      Assert.assertTrue(result.size() == 16 || result.size() == 17);
-      result.forEach(actual -> Assert.assertEquals(defaultVal, actual));
+  @Test
+  public void shouldReturnNothingWhenNoMeasurementsExist() {
+    // no measurements exist
+    when(profilerClient.fetch(
+            eq(Object.class),
+            any(),
+            any(),
+            any(),
+            any(),
+            any())).thenReturn(Collections.emptyList());
+
+    // no measurements exist
+    results = run("PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))");
+    assertEquals(0, results.size());
   }
 
-  /**
-   * Values should be retrievable that were written with configuration different than current global config.
-   */
   @Test
-  public void testWithConfigOverride() {
-    final int periodsPerHour = 4;
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Collections.emptyList();
-
-    // setup - write some measurements to be read later
-    final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, count, group, val -> expectedValue);
-
-    // now change the executor configuration
-    Context context2 = setup2();
-    // validate it is changed in significant way
-    @SuppressWarnings("unchecked")
-    Map<String, Object> global = (Map<String, Object>) context2.getCapability(Context.Capabilities.GLOBAL_CONFIG).get();
-    Assert.assertEquals(PROFILER_PERIOD.get(global), periodDuration2);
-    Assert.assertNotEquals(periodDuration, periodDuration2);
-
-    // execute - read the profile values - with (wrong) default global config values.
-    // No error message at this time, but returns empty results list, because
-    // row keys are not correctly calculated.
-    String expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))";
-    @SuppressWarnings("unchecked")
-    List<Integer> result = run(expr, List.class);
-
-    // validate - expect to fail to read any values
-    Assert.assertEquals(0, result.size());
-
-    // execute - read the profile values - with config_override.
-    // first two override values are strings, third is deliberately a number.
-    String overrides = "{'profiler.client.period.duration' : '" + periodDuration + "', "
-            + "'profiler.client.period.duration.units' : '" + periodUnits.toString() + "', "
-            + "'profiler.client.salt.divisor' : " + saltDivisor + " }";
-    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS', " + overrides + "), [], " + overrides + ")"
-            ;
-    result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-    result.forEach(actual -> Assert.assertEquals(expectedValue, actual.intValue()));
+  public void shouldUseDefaultValueFromGlobals() {
+    // set a default value
+    globals.put("profiler.default.value", defaultValue);
+
+    // the underlying profile client is responsible for returning the default value, if no profiles are found
+    when(profilerClient.fetch(
+            eq(Object.class),
+            eq(expected.getProfileName()),
+            eq(expected.getEntity()),
+            eq(expected.getGroups()),
+            any(),
+            eq(Optional.of(defaultValue)))).thenReturn(Arrays.asList(defaultMeasurement));
+
+    results = run("PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))");
+    assertEquals(1, results.size());
+    assertEquals(defaultValue, results.get(0));
   }
 
-  /**
-   * Values should be retrievable that have been stored within a 'group', with
-   * configuration different than current global config.
-   * This time put the config_override case before the non-override case.
-   */
   @Test
-  public void testWithConfigAndOneGroup() {
-    final int periodsPerHour = 4;
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Arrays.asList("weekends");
-
-    // setup - write some measurements to be read later
-    final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, count, group, val -> expectedValue);
-
-    // create a variable that contains the groups to use
-    state.put("groups", group);
-
-    // now change the executor configuration
-    Context context2 = setup2();
-    // validate it is changed in significant way
-    @SuppressWarnings("unchecked")
-    Map<String, Object> global = (Map<String, Object>) context2.getCapability(Context.Capabilities.GLOBAL_CONFIG).get();
-    Assert.assertEquals(global.get(PROFILER_PERIOD.getKey()), Long.toString(periodDuration2));
-    Assert.assertNotEquals(periodDuration, periodDuration2);
-
-    // execute - read the profile values - with config_override.
-    // first two override values are strings, third is deliberately a number.
-    String overrides = "{'profiler.client.period.duration' : '" + periodDuration + "', "
-            + "'profiler.client.period.duration.units' : '" + periodUnits.toString() + "', "
-            + "'profiler.client.salt.divisor' : " + saltDivisor + " }";
-    String expr = "PROFILE_GET('profile1', 'entity1'" +
-            ", PROFILE_FIXED(4, 'HOURS', " + overrides + "), ['weekends'], " +
-            overrides + ")";
-    @SuppressWarnings("unchecked")
-    List<Integer> result = run(expr, List.class);
-
-    // validate - expect to read all values from the past 4 hours
-    Assert.assertEquals(count, result.size());
-
-    // execute - read the profile values - with (wrong) default global config values.
-    // No error message at this time, but returns empty results list, because
-    // row keys are not correctly calculated.
-    expr = "PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['weekends'])";
-    result = run(expr, List.class);
-
-    // validate - expect to fail to read any values
-    Assert.assertEquals(0, result.size());
+  public void shouldUseDefaultValueFromOverrides() {
+    // the underlying profile client is responsible for returning the default value, if no profiles are found
+    when(profilerClient.fetch(
+            eq(Object.class),
+            eq(expected.getProfileName()),
+            eq(expected.getEntity()),
+            eq(expected.getGroups()),
+            any(),
+            eq(Optional.of(defaultValue)))).thenReturn(Arrays.asList(defaultMeasurement));
+
+    // set the default value in the overrides map
+    String overrides = String.format( "{ 'profiler.default.value': %s }", defaultValue);
+    results = run("PROFILE_GET('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), [], " + overrides + ")");
+    assertEquals(1, results.size());
+    assertEquals(defaultValue, results.get(0));
   }
 }
diff --git a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/VerboseProfileTest.java b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/VerboseProfileTest.java
index bd39007..6869046 100644
--- a/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/VerboseProfileTest.java
+++ b/metron-analytics/metron-profiler-client/src/test/java/org/apache/metron/profiler/client/stellar/VerboseProfileTest.java
@@ -20,19 +20,13 @@
 
 package org.apache.metron.profiler.client.stellar;
 
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 import org.apache.metron.profiler.ProfileMeasurement;
-import org.apache.metron.profiler.client.ProfileWriter;
-import org.apache.metron.profiler.hbase.ColumnBuilder;
-import org.apache.metron.profiler.hbase.RowKeyBuilder;
-import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
-import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
+import org.apache.metron.profiler.client.ProfilerClient;
 import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
 import org.apache.metron.stellar.common.StellarStatefulExecutor;
 import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
 import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -41,180 +35,176 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
-import static org.apache.metron.profiler.client.stellar.VerboseProfile.ENTITY_KEY;
-import static org.apache.metron.profiler.client.stellar.VerboseProfile.GROUPS_KEY;
-import static org.apache.metron.profiler.client.stellar.VerboseProfile.PERIOD_END_KEY;
-import static org.apache.metron.profiler.client.stellar.VerboseProfile.PERIOD_KEY;
-import static org.apache.metron.profiler.client.stellar.VerboseProfile.PERIOD_START_KEY;
-import static org.apache.metron.profiler.client.stellar.VerboseProfile.PROFILE_KEY;
-import static org.apache.metron.profiler.client.stellar.VerboseProfile.VALUE_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
- * Tests the VerboseProfile class.
+ * Tests the 'PROFILE_VERBOSE' function in the {@link VerboseProfile} class.
  */
 public class VerboseProfileTest {
-  private static final long periodDuration = 15;
-  private static final TimeUnit periodUnits = TimeUnit.MINUTES;
-  private static final int saltDivisor = 1000;
-  private static final String tableName = "profiler";
-  private static final String columnFamily = "P";
+
   private StellarStatefulExecutor executor;
-  private Map<String, Object> state;
-  private ProfileWriter profileWriter;
+  private FunctionResolver functionResolver;
+  private Map<String, Object> globals;
+  private VerboseProfile function;
+  private ProfilerClient profilerClient;
+  private List<Map<String, Object>> results;
+  private ProfileMeasurement expected;
 
-  private <T> T run(String expression, Class<T> clazz) {
-    return executor.execute(expression, state, clazz);
+  private List run(String expression) {
+    return executor.execute(expression, new HashMap<>(), List.class);
   }
 
-  private Map<String, Object> globals;
-
   @Before
   public void setup() {
-    state = new HashMap<>();
-    final HTableInterface table = MockHBaseTableProvider.addToCache(tableName, columnFamily);
-
-    // used to write values to be read during testing
-    long periodDurationMillis = TimeUnit.MINUTES.toMillis(15);
-    RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
-    ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
-    profileWriter = new ProfileWriter(rowKeyBuilder, columnBuilder, table, periodDurationMillis);
+    // the mock profiler client used to feed profile measurement values to the function
+    profilerClient = mock(ProfilerClient.class);
 
     // global properties
-    globals = new HashMap<String, Object>() {{
-      put(PROFILER_HBASE_TABLE.getKey(), tableName);
-      put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
-      put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
-      put(PROFILER_PERIOD.getKey(), Long.toString(periodDuration));
-      put(PROFILER_PERIOD_UNITS.getKey(), periodUnits.toString());
-      put(PROFILER_SALT_DIVISOR.getKey(), Integer.toString(saltDivisor));
-    }};
+    globals = new HashMap<>();
+    Context context = new Context.Builder()
+            .with(Context.Capabilities.GLOBAL_CONFIG, () -> globals)
+            .build();
+
+    // the VERBOSE_PROFILE function that will be tested
+    function = new VerboseProfile(globals -> profilerClient);
+    function.initialize(context);
 
     // create the stellar execution environment
-    executor = new DefaultStellarStatefulExecutor(
-            new SimpleFunctionResolver()
-                    .withClass(VerboseProfile.class)
-                    .withClass(FixedLookback.class),
-            new Context.Builder()
-                    .with(Context.Capabilities.GLOBAL_CONFIG, () -> globals)
-                    .build());
-  }
+    functionResolver = new SimpleFunctionResolver()
+            .withClass(FixedLookback.class)
+            .withInstance(function);
+    executor = new DefaultStellarStatefulExecutor(functionResolver, context);
 
-  @Test
-  public void shouldReturnMeasurementsWhenNotGrouped() {
-    final int periodsPerHour = 4;
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Collections.emptyList();
-
-    // setup - write some measurements to be read later
-    final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement()
+    // create a profile measurement used in the tests
+    expected = new ProfileMeasurement()
             .withProfileName("profile1")
             .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, count, group, val -> expectedValue);
-
-    // expect to see all values over the past 4 hours
-    List<Map<String, Object>> results;
-    results = run("PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))", List.class);
-    Assert.assertEquals(count, results.size());
-    for(Map<String, Object> actual: results) {
-      Assert.assertEquals("profile1", actual.get(PROFILE_KEY));
-      Assert.assertEquals("entity1", actual.get(ENTITY_KEY));
-      Assert.assertNotNull(actual.get(PERIOD_KEY));
-      Assert.assertNotNull(actual.get(PERIOD_START_KEY));
-      Assert.assertNotNull(actual.get(PERIOD_END_KEY));
-      Assert.assertNotNull(actual.get(GROUPS_KEY));
-      Assert.assertEquals(expectedValue, actual.get(VALUE_KEY));
-    }
+            .withPeriod(System.currentTimeMillis(), 5, TimeUnit.MINUTES)
+            .withProfileValue(1231121);
   }
 
   @Test
-  public void shouldReturnMeasurementsWhenGrouped() {
-    final int periodsPerHour = 4;
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Arrays.asList("weekends");
-
-    // setup - write some measurements to be read later
-    final int count = hours * periodsPerHour;
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, count, group, val -> expectedValue);
-
-    // create a variable that contains the groups to use
-    state.put("groups", group);
-
-    // expect to see all values over the past 4 hours for the group
-    List<Map<String, Object>> results;
-    results = run("PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), groups)", List.class);
-    Assert.assertEquals(count, results.size());
-    for(Map<String, Object> actual: results) {
-      Assert.assertEquals("profile1", actual.get(PROFILE_KEY));
-      Assert.assertEquals("entity1", actual.get(ENTITY_KEY));
-      Assert.assertNotNull(actual.get(PERIOD_KEY));
-      Assert.assertNotNull(actual.get(PERIOD_START_KEY));
-      Assert.assertNotNull(actual.get(PERIOD_END_KEY));
-      Assert.assertNotNull(actual.get(GROUPS_KEY));
-      Assert.assertEquals(expectedValue, actual.get(VALUE_KEY));
-    }
+  public void shouldRenderVerboseView() {
+    // only one profile measurement exists
+    when(profilerClient.fetch(
+            eq(Object.class),
+            eq(expected.getProfileName()),
+            eq(expected.getEntity()),
+            eq(expected.getGroups()),
+            any(),
+            any())).thenReturn(Arrays.asList(expected));
+
+    // expect the one measurement to be returned
+    results = run("PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))");
+    assertEquals(1, results.size());
+    Map<String, Object> actual = results.get(0);
+
+    // the measurement should be rendered as a map containing detailed information about the profile measurement
+    assertEquals(expected.getProfileName(),                 actual.get("profile"));
+    assertEquals(expected.getEntity(),                      actual.get("entity"));
+    assertEquals(expected.getPeriod().getPeriod(),          actual.get("period"));
+    assertEquals(expected.getPeriod().getStartTimeMillis(), actual.get("period.start"));
+    assertEquals(expected.getPeriod().getEndTimeMillis(),   actual.get("period.end"));
+    assertEquals(expected.getProfileValue(),                actual.get("value"));
+    assertEquals(expected.getGroups(),                      actual.get("groups"));
   }
 
   @Test
-  public void shouldReturnNothingWhenNoMeasurementsExist() {
-    final int expectedValue = 2302;
-    final int hours = 2;
-    final long startTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(hours);
-    final List<Object> group = Collections.emptyList();
+  public void shouldRenderVerboseViewWithGroup() {
+    // the profile measurement is part of a group
+    expected.withGroups(Arrays.asList("group1"));
+
+    // only one profile measurement exists
+    when(profilerClient.fetch(
+            eq(Object.class),
+            eq(expected.getProfileName()),
+            eq(expected.getEntity()),
+            eq(expected.getGroups()),
+            any(),
+            any())).thenReturn(Arrays.asList(expected));
+
+    // expect the one measurement to be returned
+    results = run("PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['group1'])");
+    assertEquals(1, results.size());
+    Map<String, Object> actual = results.get(0);
+
+    // the measurement should be rendered as a map containing detailed information about the profile measurement
+    assertEquals(expected.getProfileName(),                 actual.get("profile"));
+    assertEquals(expected.getEntity(),                      actual.get("entity"));
+    assertEquals(expected.getPeriod().getPeriod(),          actual.get("period"));
+    assertEquals(expected.getPeriod().getStartTimeMillis(), actual.get("period.start"));
+    assertEquals(expected.getPeriod().getEndTimeMillis(),   actual.get("period.end"));
+    assertEquals(expected.getProfileValue(),                actual.get("value"));
+    assertEquals(expected.getGroups(),                      actual.get("groups"));
+  }
 
-    // setup - write a single value from 2 hours ago
-    ProfileMeasurement m = new ProfileMeasurement()
-            .withProfileName("profile1")
-            .withEntity("entity1")
-            .withPeriod(startTime, periodDuration, periodUnits);
-    profileWriter.write(m, 1, group, val -> expectedValue);
+  @Test
+  public void shouldRenderVerboseViewWithDifferentGroup() {
+    // the profile measurement is part of a group
+    expected.withGroups(Arrays.asList("group1"));
+
+    // only one profile measurement exists
+    when(profilerClient.fetch(
+            eq(Object.class),
+            eq(expected.getProfileName()),
+            eq(expected.getEntity()),
+            eq(expected.getGroups()),
+            any(),
+            any())).thenReturn(Arrays.asList(expected));
+
+    // the profile measurement is not part of 'group999'
+    results = run("PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'), ['group999'])");
+    assertEquals(0, results.size());
+  }
 
-    // expect to get NO measurements over the past 4 seconds
-    List<Map<String, Object>> result;
-    result = run("PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'SECONDS'))", List.class);
-    Assert.assertEquals(0, result.size());
+  @Test
+  public void shouldReturnNothingWhenNoMeasurementsExist() {
+    // no measurements exist
+    when(profilerClient.fetch(
+            eq(Object.class),
+            any(),
+            any(),
+            any(),
+            any(),
+            any())).thenReturn(Collections.emptyList());
+
+    // no measurements exist
+    results = run("PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))");
+    assertEquals(0, results.size());
   }
 
   @Test
-  public void shouldReturnDefaultValueWhenNoMeasurementsExist() {
+  public void shouldReturnDefaultValue() {
     // set a default value
-    String defaultVal = "this is the default value";
-    globals.put("profiler.default.value", defaultVal);
-
-    // no profiles exist
-    String expr = "PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))";
-    List<Map<String, Object>> results = run(expr, List.class);
-
-    // expect to get the default value instead of no results
-    Assert.assertTrue(results.size() == 16 || results.size() == 17);
-    for(Map<String, Object> actual: results) {
-      Assert.assertEquals("profile1", actual.get(PROFILE_KEY));
-      Assert.assertEquals("entity1", actual.get(ENTITY_KEY));
-      Assert.assertNotNull(actual.get(PERIOD_KEY));
-      Assert.assertNotNull(actual.get(PERIOD_START_KEY));
-      Assert.assertNotNull(actual.get(PERIOD_END_KEY));
-      Assert.assertNotNull(actual.get(GROUPS_KEY));
-
-      // expect the default value
-      Assert.assertEquals(defaultVal, actual.get(VALUE_KEY));
-    }
-
+    globals.put("profiler.default.value", expected);
+
+    // the underlying profile client needs to be setup to return the default
+    when(profilerClient.fetch(
+            eq(Object.class),
+            eq(expected.getProfileName()),
+            eq(expected.getEntity()),
+            eq(expected.getGroups()),
+            any(),
+            eq(Optional.of(expected)))).thenReturn(Arrays.asList(expected));
+
+    results = run("PROFILE_VERBOSE('profile1', 'entity1', PROFILE_FIXED(4, 'HOURS'))");
+    assertEquals(1, results.size());
+    Map<String, Object> actual = results.get(0);
+
+    // no measurements exist, but we expect the default value to be returned
+    assertEquals(expected.getProfileName(),                 actual.get("profile"));
+    assertEquals(expected.getEntity(),                      actual.get("entity"));
+    assertEquals(expected.getPeriod().getPeriod(),          actual.get("period"));
+    assertEquals(expected.getPeriod().getStartTimeMillis(), actual.get("period.start"));
+    assertEquals(expected.getPeriod().getEndTimeMillis(),   actual.get("period.end"));
+    assertEquals(expected.getProfileValue(),                actual.get("value"));
+    assertEquals(expected.getGroups(),                      actual.get("groups"));
   }
 }
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java
index 3f889bc..5a9610e 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/SaltyRowKeyBuilder.java
@@ -241,10 +241,18 @@ public class SaltyRowKeyBuilder implements RowKeyBuilder {
     }
   }
 
+  public long getPeriodDurationMillis() {
+    return periodDurationMillis;
+  }
+
   public void withPeriodDuration(long duration, TimeUnit units) {
     periodDurationMillis = units.toMillis(duration);
   }
 
+  public int getSaltDivisor() {
+    return saltDivisor;
+  }
+
   public void setSaltDivisor(int saltDivisor) {
     this.saltDivisor = saltDivisor;
   }
diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
index 0a4a99d..b580db8 100644
--- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
+++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/hbase/ValueOnlyColumnBuilder.java
@@ -34,7 +34,6 @@ public class ValueOnlyColumnBuilder implements ColumnBuilder {
    * The column family storing the profile data.
    */
   private String columnFamily;
-
   private byte[] columnFamilyBytes;
 
   public ValueOnlyColumnBuilder() {
@@ -47,7 +46,6 @@ public class ValueOnlyColumnBuilder implements ColumnBuilder {
 
   @Override
   public ColumnList columns(ProfileMeasurement measurement) {
-
     ColumnList cols = new ColumnList();
     cols.addColumn(columnFamilyBytes, getColumnQualifier("value"), SerDeUtils.toBytes(measurement.getProfileValue()));
 
@@ -66,7 +64,6 @@ public class ValueOnlyColumnBuilder implements ColumnBuilder {
 
   @Override
   public byte[] getColumnQualifier(String fieldName) {
-
     if("value".equals(fieldName)) {
       return Bytes.toBytes("value");
     }
diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
index e04404c..31b41ff 100644
--- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
+++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
@@ -21,7 +21,6 @@
 package org.apache.metron.profiler;
 
 import com.github.benmanes.caffeine.cache.Ticker;
-import com.google.common.util.concurrent.MoreExecutors;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.common.utils.JSONUtils;
@@ -33,7 +32,6 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import static java.util.concurrent.TimeUnit.HOURS;
diff --git a/metron-analytics/metron-profiler-spark/pom.xml b/metron-analytics/metron-profiler-spark/pom.xml
index b046cc4..0cd811d 100644
--- a/metron-analytics/metron-profiler-spark/pom.xml
+++ b/metron-analytics/metron-profiler-spark/pom.xml
@@ -145,6 +145,12 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${global_mockito_version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.metron</groupId>
             <artifactId>metron-integration-test</artifactId>
             <version>${project.parent.version}</version>
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
index 571545e..d3cfdca 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfiler.java
@@ -28,6 +28,7 @@ import org.apache.metron.profiler.spark.function.MessageRouterFunction;
 import org.apache.metron.profiler.spark.function.ProfileBuilderFunction;
 import org.apache.metron.profiler.spark.reader.TelemetryReader;
 import org.apache.metron.profiler.spark.reader.TelemetryReaders;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.SparkSession;
@@ -97,8 +98,11 @@ public class BatchProfiler implements Serializable {
     LOG.debug("Produced {} profile measurement(s)", measurements.cache().count());
 
     // write the profile measurements to HBase
+    MapPartitionsFunction<ProfileMeasurementAdapter, Integer> mapper = new HBaseWriterFunction.Builder()
+            .withProperties(profilerProps)
+            .build();
     long count = measurements
-            .mapPartitions(new HBaseWriterFunction(profilerProps), Encoders.INT())
+            .mapPartitions(mapper, Encoders.INT())
             .agg(sum("value"))
             .head()
             .getLong(0);
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
index 148d970..d120a29 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/BatchProfilerConfig.java
@@ -20,13 +20,14 @@
 package org.apache.metron.profiler.spark;
 
 import org.apache.hadoop.hbase.client.Durability;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
+import org.apache.metron.hbase.client.HBaseTableClientFactory;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
 
 import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.metron.profiler.spark.reader.TelemetryReaders.JSON;
-import static org.apache.metron.profiler.spark.reader.TelemetryReaders.TEXT;
 
 /**
  * Defines the configuration values recognized by the Batch Profiler.
@@ -39,7 +40,9 @@ public enum BatchProfilerConfig {
 
   HBASE_SALT_DIVISOR("profiler.hbase.salt.divisor", 1000, Integer.class),
 
-  HBASE_TABLE_PROVIDER("profiler.hbase.table.provider", "org.apache.metron.hbase.HTableProvider", String.class),
+  HBASE_CONNECTION_FACTORY("profiler.hbase.connection.provider", HBaseConnectionFactory.class.getName(), String.class),
+
+  HBASE_CLIENT_FACTORY("profiler.hbase.client.factory", HBaseTableClientFactory.class, String.class),
 
   HBASE_TABLE_NAME("profiler.hbase.table", "profiler", String.class),
 
diff --git a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
index 6a090cf..bb66af8 100644
--- a/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
+++ b/metron-analytics/metron-profiler-spark/src/main/java/org/apache/metron/profiler/spark/function/HBaseWriterFunction.java
@@ -21,12 +21,13 @@ package org.apache.metron.profiler.spark.function;
 
 import org.apache.commons.collections4.IteratorUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Durability;
-import org.apache.metron.hbase.HTableProvider;
-import org.apache.metron.hbase.TableProvider;
-import org.apache.metron.hbase.client.LegacyHBaseClient;
+import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.HBaseClientFactory;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
+import org.apache.metron.hbase.client.HBaseTableClient;
+import org.apache.metron.hbase.client.HBaseTableClientFactory;
 import org.apache.metron.profiler.ProfileMeasurement;
 import org.apache.metron.profiler.hbase.ColumnBuilder;
 import org.apache.metron.profiler.hbase.RowKeyBuilder;
@@ -39,16 +40,16 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
-import java.lang.reflect.InvocationTargetException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_CLIENT_FACTORY;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_CONNECTION_FACTORY;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_SALT_DIVISOR;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME;
-import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_PROVIDER;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_WRITE_DURABILITY;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.PERIOD_DURATION_UNITS;
@@ -60,7 +61,93 @@ public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasure
 
   protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private TableProvider tableProvider;
+  public static class Builder {
+    private HBaseConnectionFactory connectionFactory = new HBaseConnectionFactory();
+    private HBaseClientFactory hBaseClientFactory = new HBaseTableClientFactory();
+    private String tableName = String.class.cast(HBASE_TABLE_NAME.getDefault());
+    private Durability durability = Durability.class.cast(HBASE_WRITE_DURABILITY.getDefault());
+    private RowKeyBuilder rowKeyBuilder = new SaltyRowKeyBuilder();
+    private ColumnBuilder columnBuilder = new ValueOnlyColumnBuilder();
+
+    public Builder withConnectionFactory(HBaseConnectionFactory connectionFactory) {
+      this.connectionFactory = connectionFactory;
+      return this;
+    }
+
+    public Builder withClientFactory(HBaseClientFactory clientFactory) {
+      this.hBaseClientFactory = clientFactory;
+      return this;
+    }
+
+    public Builder withRowKeyBuilder(RowKeyBuilder rowKeyBuilder) {
+      this.rowKeyBuilder = rowKeyBuilder;
+      return this;
+    }
+
+    public Builder withColumnBuilder(ColumnBuilder columnBuilder) {
+      this.columnBuilder = columnBuilder;
+      return this;
+    }
+
+    public Builder withProperties(Properties properties) {
+      // row key builder
+      int saltDivisor = HBASE_SALT_DIVISOR.get(properties, Integer.class);
+      int periodDuration = PERIOD_DURATION.get(properties, Integer.class);
+      TimeUnit periodDurationUnits = TimeUnit.valueOf(PERIOD_DURATION_UNITS.get(properties, String.class));
+      rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDuration, periodDurationUnits);
+
+      // column builder
+      String columnFamily = HBASE_COLUMN_FAMILY.get(properties, String.class);
+      columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
+
+      // hbase
+      tableName = HBASE_TABLE_NAME.get(properties, String.class);
+      durability = HBASE_WRITE_DURABILITY.get(properties, Durability.class);
+
+      // connection factory
+      String factoryImpl = HBASE_CONNECTION_FACTORY.get(properties, String.class);
+      connectionFactory = createConnectionFactory(factoryImpl);
+
+      // client creator
+      String creatorImpl = HBASE_CLIENT_FACTORY.get(properties, String.class);
+      hBaseClientFactory = HBaseClientFactory.byName(creatorImpl, () -> new HBaseTableClientFactory());
+
+      return this;
+    }
+
+    private static HBaseConnectionFactory createConnectionFactory(String factoryImpl) {
+      LOG.trace("Creating table provider; className={}", factoryImpl);
+
+      // if class name not defined, use a reasonable default
+      if(StringUtils.isEmpty(factoryImpl) || factoryImpl.charAt(0) == '$') {
+        return new HBaseConnectionFactory();
+      }
+
+      // instantiate the table provider
+      return HBaseConnectionFactory.byName(factoryImpl);
+    }
+
+    public HBaseWriterFunction build() {
+      HBaseWriterFunction function = new HBaseWriterFunction();
+      function.connectionFactory = connectionFactory;
+      function.hBaseClientFactory = hBaseClientFactory;
+      function.tableName = tableName;
+      function.durability = durability;
+      function.rowKeyBuilder = rowKeyBuilder;
+      function.columnBuilder = columnBuilder;
+      return function;
+    }
+  }
+
+  /**
+   * Establishes connections to HBase.
+   */
+  private HBaseConnectionFactory connectionFactory;
+
+  /**
+   * Creates the {@link HBaseTableClient} when it is needed.
+   */
+  private HBaseClientFactory hBaseClientFactory;
 
   /**
    * The name of the HBase table to write to.
@@ -82,23 +169,11 @@ public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasure
    */
   private ColumnBuilder columnBuilder;
 
-  public HBaseWriterFunction(Properties properties) {
-    tableName = HBASE_TABLE_NAME.get(properties, String.class);
-    durability = HBASE_WRITE_DURABILITY.get(properties, Durability.class);
-
-    // row key builder
-    int saltDivisor = HBASE_SALT_DIVISOR.get(properties, Integer.class);
-    int periodDuration = PERIOD_DURATION.get(properties, Integer.class);
-    TimeUnit periodDurationUnits = TimeUnit.valueOf(PERIOD_DURATION_UNITS.get(properties, String.class));
-    rowKeyBuilder = new SaltyRowKeyBuilder(saltDivisor, periodDuration, periodDurationUnits);
-
-    // column builder
-    String columnFamily = HBASE_COLUMN_FAMILY.get(properties, String.class);
-    columnBuilder = new ValueOnlyColumnBuilder(columnFamily);
-
-    // hbase table provider
-    String providerImpl = HBASE_TABLE_PROVIDER.get(properties, String.class);
-    tableProvider = createTableProvider(providerImpl);
+  /**
+   * Use the {@link HBaseWriterFunction.Builder} instead.
+   */
+  private HBaseWriterFunction() {
+    // nothing to do
   }
 
   /**
@@ -117,8 +192,7 @@ public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasure
     if(measurements.size() > 0) {
 
       // open an HBase connection
-      Configuration config = HBaseConfiguration.create();
-      try (LegacyHBaseClient client = new LegacyHBaseClient(tableProvider, config, tableName)) {
+      try (HBaseClient client = hBaseClientFactory.create(connectionFactory, HBaseConfiguration.create(), tableName)) {
 
         for (ProfileMeasurementAdapter adapter : measurements) {
           ProfileMeasurement m = adapter.toProfileMeasurement();
@@ -135,37 +209,4 @@ public class HBaseWriterFunction implements MapPartitionsFunction<ProfileMeasure
     LOG.debug("{} profile measurement(s) written to HBase", count);
     return IteratorUtils.singletonIterator(count);
   }
-
-  /**
-   * Set the {@link TableProvider} using the class name of the provider.
-   * @param providerImpl The name of the class.
-   * @return
-   */
-  public HBaseWriterFunction withTableProviderImpl(String providerImpl) {
-    this.tableProvider = createTableProvider(providerImpl);
-    return this;
-  }
-
-  /**
-   * Creates a TableProvider based on a class name.
-   * @param providerImpl The class name of a TableProvider
-   */
-  private static TableProvider createTableProvider(String providerImpl) {
-    LOG.trace("Creating table provider; className={}", providerImpl);
-
-    // if class name not defined, use a reasonable default
-    if(StringUtils.isEmpty(providerImpl) || providerImpl.charAt(0) == '$') {
-      return new HTableProvider();
-    }
-
-    // instantiate the table provider
-    try {
-      Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(providerImpl);
-      return clazz.getConstructor().newInstance();
-
-    } catch (InstantiationException | IllegalAccessException | IllegalStateException |
-            InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) {
-      throw new IllegalStateException("Unable to instantiate connector", e);
-    }
-  }
 }
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
index b36cf8c..673a0b6 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/BatchProfilerIntegrationTest.java
@@ -20,10 +20,16 @@
 package org.apache.metron.profiler.spark;
 
 import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.hbase.client.FakeHBaseClient;
+import org.apache.metron.hbase.client.FakeHBaseClientFactory;
+import org.apache.metron.hbase.client.FakeHBaseConnectionFactory;
+import org.apache.metron.profiler.client.HBaseProfilerClient;
+import org.apache.metron.profiler.client.ProfilerClient;
 import org.apache.metron.profiler.client.stellar.FixedLookback;
 import org.apache.metron.profiler.client.stellar.GetProfile;
 import org.apache.metron.profiler.client.stellar.WindowLookback;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
 import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
 import org.apache.metron.stellar.common.StellarStatefulExecutor;
 import org.apache.metron.stellar.dsl.Context;
@@ -46,14 +52,16 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.metron.common.configuration.profiler.ProfilerConfig.fromJSON;
 import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
 import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
+import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_CONNECTION_FACTORY;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_CLIENT_FACTORY;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME;
-import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_PROVIDER;
+import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_CONNECTION_FACTORY;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_BEGIN;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_END;
 import static org.apache.metron.profiler.spark.BatchProfilerConfig.TELEMETRY_INPUT_FORMAT;
@@ -101,28 +109,40 @@ public class BatchProfilerIntegrationTest {
   public void setup() {
     readerProperties = new Properties();
     profilerProperties = new Properties();
-
-    // the output will be written to a mock HBase table
     String tableName = HBASE_TABLE_NAME.get(profilerProperties, String.class);
     String columnFamily = HBASE_COLUMN_FAMILY.get(profilerProperties, String.class);
-    profilerProperties.put(HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
-
-    // create the mock hbase table
-    MockHBaseTableProvider.addToCache(tableName, columnFamily);
+    profilerProperties.put(HBASE_CONNECTION_FACTORY.getKey(), FakeHBaseConnectionFactory.class.getName());
 
     // define the globals required by `PROFILE_GET`
     Map<String, Object> global = new HashMap<String, Object>() {{
       put(PROFILER_HBASE_TABLE.getKey(), tableName);
       put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
-      put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
+      put(PROFILER_HBASE_CONNECTION_FACTORY.getKey(), FakeHBaseConnectionFactory.class.getName());
     }};
 
+    // the batch profiler needs to use the `FakeHBaseClient` for these tests
+    profilerProperties.put(HBASE_CLIENT_FACTORY.getKey(), FakeHBaseClientFactory.class.getName());
+
+    // ensure that all of the static records are deleted before running the test
+    FakeHBaseClient hbaseClient = new FakeHBaseClient();
+    hbaseClient.deleteAll();
+
+    // create a `ProfilerClient` that uses the `FakeHBaseClient`
+    ProfilerClient profilerClient = new HBaseProfilerClient(
+            hbaseClient,
+            new SaltyRowKeyBuilder(),
+            new ValueOnlyColumnBuilder(),
+            TimeUnit.MINUTES.toMillis(15));
+
+    // create an instance of `PROFILE_GET` that indirectly uses the `FakeHBaseClient`
+    GetProfile profileGetFunction = new GetProfile(globals -> profilerClient);
+
     // create the stellar execution environment
     executor = new DefaultStellarStatefulExecutor(
             new SimpleFunctionResolver()
-                    .withClass(GetProfile.class)
                     .withClass(FixedLookback.class)
-                    .withClass(WindowLookback.class),
+                    .withClass(WindowLookback.class)
+                    .withInstance(profileGetFunction),
             new Context.Builder()
                     .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
                     .build());
diff --git a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java
index 55f3e21..c0f0370 100644
--- a/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java
+++ b/metron-analytics/metron-profiler-spark/src/test/java/org/apache/metron/profiler/spark/function/HBaseWriterFunctionTest.java
@@ -21,8 +21,14 @@ package org.apache.metron.profiler.spark.function;
 
 import org.apache.commons.collections4.IteratorUtils;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
-import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.hbase.ColumnList;
+import org.apache.metron.hbase.client.FakeHBaseClient;
+import org.apache.metron.hbase.client.HBaseClientFactory;
 import org.apache.metron.profiler.ProfileMeasurement;
+import org.apache.metron.profiler.hbase.ColumnBuilder;
+import org.apache.metron.profiler.hbase.RowKeyBuilder;
+import org.apache.metron.profiler.hbase.SaltyRowKeyBuilder;
+import org.apache.metron.profiler.hbase.ValueOnlyColumnBuilder;
 import org.apache.metron.profiler.spark.ProfileMeasurementAdapter;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
@@ -32,64 +38,71 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_COLUMN_FAMILY;
-import static org.apache.metron.profiler.spark.BatchProfilerConfig.HBASE_TABLE_NAME;
+import static org.apache.metron.hbase.client.FakeHBaseClient.Mutation;
 
 public class HBaseWriterFunctionTest {
 
-  Properties profilerProperties;
+  private HBaseWriterFunction function;
+  private RowKeyBuilder rowKeyBuilder;
+  private ColumnBuilder columnBuilder;
+  private FakeHBaseClient hbaseClient;
+  private HBaseClientFactory hBaseClientFactory;
+
+  private static final JSONObject message = getMessage();
+  private static final String entity = (String) message.get("ip_src_addr");
+  private static final long timestamp = (Long) message.get("timestamp");
+  private static final ProfileConfig profile = getProfile();
 
   @Before
   public void setup() {
-    profilerProperties = getProfilerProperties();
-
-    // create a mock table for HBase
-    String tableName = HBASE_TABLE_NAME.get(profilerProperties, String.class);
-    String columnFamily = HBASE_COLUMN_FAMILY.get(profilerProperties, String.class);
-    MockHBaseTableProvider.addToCache(tableName, columnFamily);
+    hbaseClient = new FakeHBaseClient();
+    hbaseClient.deleteAll();
+    hBaseClientFactory = (x, y, z) -> hbaseClient;
+    rowKeyBuilder = new SaltyRowKeyBuilder();
+    columnBuilder = new ValueOnlyColumnBuilder();
+    function = new HBaseWriterFunction.Builder()
+            .withRowKeyBuilder(rowKeyBuilder)
+            .withColumnBuilder(columnBuilder)
+            .withClientFactory(hBaseClientFactory)
+            .build();
   }
 
   @Test
   public void testWrite() throws Exception {
-
-    JSONObject message = getMessage();
-    String entity = (String) message.get("ip_src_addr");
-    long timestamp = (Long) message.get("timestamp");
-    ProfileConfig profile = getProfile();
-
-    // setup the profile measurements that will be written
+    // write a profile measurement
     List<ProfileMeasurementAdapter> measurements = createMeasurements(1, entity, timestamp, profile);
-
-    // setup the function to test
-    HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
-    function.withTableProviderImpl(MockHBaseTableProvider.class.getName());
-
-    // write the measurements
     Iterator<Integer> results = function.call(measurements.iterator());
 
-    // validate the result
+    // validate the results
     List<Integer> counts = IteratorUtils.toList(results);
     Assert.assertEquals(1, counts.size());
     Assert.assertEquals(1, counts.get(0).intValue());
+
+    // 1 record should have been written to the hbase client
+    List<Mutation> written = hbaseClient.getAllPersisted();
+    Assert.assertEquals(1, written.size());
+
+    // validate the row key used to write to hbase
+    ProfileMeasurement m = measurements.get(0).toProfileMeasurement();
+    byte[] expectedRowKey = rowKeyBuilder.rowKey(m);
+    Assert.assertArrayEquals(expectedRowKey, written.get(0).rowKey);
+
+    // validate the columns used to write to hbase.
+    ColumnList expectedCols = columnBuilder.columns(m);
+    Assert.assertEquals(expectedCols, written.get(0).columnList);
   }
 
   @Test
   public void testWriteMany() throws Exception {
-
-    JSONObject message = getMessage();
-    String entity = (String) message.get("ip_src_addr");
-    long timestamp = (Long) message.get("timestamp");
-    ProfileConfig profile = getProfile();
-
     // setup the profile measurements that will be written
     List<ProfileMeasurementAdapter> measurements = createMeasurements(10, entity, timestamp, profile);
 
     // setup the function to test
-    HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
-    function.withTableProviderImpl(MockHBaseTableProvider.class.getName());
+    HBaseWriterFunction function = new HBaseWriterFunction.Builder()
+            .withClientFactory(hBaseClientFactory)
+            .build();
 
     // write the measurements
     Iterator<Integer> results = function.call(measurements.iterator());
@@ -102,13 +115,13 @@ public class HBaseWriterFunctionTest {
 
   @Test
   public void testWriteNone() throws Exception {
-
     // there are no profile measurements to write
     List<ProfileMeasurementAdapter> measurements = new ArrayList<>();
 
     // setup the function to test
-    HBaseWriterFunction function = new HBaseWriterFunction(profilerProperties);
-    function.withTableProviderImpl(MockHBaseTableProvider.class.getName());
+    HBaseWriterFunction function = new HBaseWriterFunction.Builder()
+            .withClientFactory(hBaseClientFactory)
+            .build();
 
     // write the measurements
     Iterator<Integer> results = function.call(measurements.iterator());
@@ -147,7 +160,7 @@ public class HBaseWriterFunctionTest {
   /**
    * Returns a telemetry message to use for testing.
    */
-  private JSONObject getMessage() {
+  private static JSONObject getMessage() {
     JSONObject message = new JSONObject();
     message.put("ip_src_addr", "192.168.1.1");
     message.put("status", "red");
@@ -156,16 +169,9 @@ public class HBaseWriterFunctionTest {
   }
 
   /**
-   * Returns profiler properties to use for testing.
-   */
-  private Properties getProfilerProperties() {
-    return new Properties();
-  }
-
-  /**
    * Returns a profile definition to use for testing.
    */
-  private ProfileConfig getProfile() {
+  private static ProfileConfig getProfile() {
     return new ProfileConfig()
             .withProfile("profile1")
             .withForeach("ip_src_addr")
diff --git a/metron-analytics/metron-profiler-storm/pom.xml b/metron-analytics/metron-profiler-storm/pom.xml
index 09bd238..d71dfc4 100644
--- a/metron-analytics/metron-profiler-storm/pom.xml
+++ b/metron-analytics/metron-profiler-storm/pom.xml
@@ -29,6 +29,16 @@
     </properties>
     <dependencies>
         <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${global_jackson_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <version>${global_jackson_version}</version>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
             <version>${guava_version}</version>
diff --git a/metron-analytics/metron-profiler-storm/src/main/config/profiler.properties b/metron-analytics/metron-profiler-storm/src/main/config/profiler.properties
index dc30838..91a5d3b 100644
--- a/metron-analytics/metron-profiler-storm/src/main/config/profiler.properties
+++ b/metron-analytics/metron-profiler-storm/src/main/config/profiler.properties
@@ -61,6 +61,8 @@ profiler.hbase.table=profiler
 profiler.hbase.column.family=P
 profiler.hbase.batch=10
 profiler.hbase.flush.interval.seconds=30
+profiler.hbase.client.factory=org.apache.metron.hbase.client.HBaseTableClientFactory
+profiler.hbase.connection.factory=org.apache.metron.hbase.client.HBaseConnectionFactory
 
 ##### Kafka #####
 
diff --git a/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml b/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml
index e16a782..b3faf34 100644
--- a/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml
+++ b/metron-analytics/metron-profiler-storm/src/main/flux/profiler/remote.yaml
@@ -137,6 +137,9 @@ components:
             - ${profiler.window.lag}
             - "${profiler.window.lag.units}"
 
+    -   id: "hbaseClientFactory"
+        className: "${profiler.hbase.client.factory}"
+
 spouts:
 
     -   id: "kafkaSpout"
@@ -179,8 +182,10 @@ bolts:
             - "${profiler.hbase.table}"
             - ref: "hbaseMapper"
         configMethods:
-            - name: "withTableProvider"
-              args: ["${hbase.provider.impl}"]
+            - name: "withClientFactory"
+              args: [ref: "hbaseClientFactory"]
+            - name: "withConnectionFactory"
+              args: ["${profiler.hbase.connection.factory}"]
             - name: "withBatchSize"
               args: [${profiler.hbase.batch}]
             - name: "withFlushIntervalSecs"
diff --git a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
index 07bd552..d5c2949 100644
--- a/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
+++ b/metron-analytics/metron-profiler-storm/src/main/java/org/apache/metron/hbase/bolt/HBaseBolt.java
@@ -20,18 +20,15 @@
 
 package org.apache.metron.hbase.bolt;
 
-import java.lang.invoke.MethodHandles;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Map;
-import java.util.Optional;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Durability;
-import org.apache.metron.hbase.HTableProvider;
-import org.apache.metron.hbase.TableProvider;
 import org.apache.metron.hbase.ColumnList;
 import org.apache.metron.hbase.bolt.mapper.HBaseMapper;
-import org.apache.metron.hbase.client.LegacyHBaseClient;
+import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.hbase.client.HBaseClientFactory;
+import org.apache.metron.hbase.client.HBaseConnectionFactory;
+import org.apache.metron.hbase.client.HBaseTableClient;
+import org.apache.metron.hbase.client.HBaseTableClientFactory;
 import org.apache.storm.Config;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -41,6 +38,10 @@ import org.apache.storm.tuple.Tuple;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.Optional;
+
 /**
  * A bolt that writes to HBase.
  *
@@ -76,25 +77,32 @@ public class HBaseBolt extends BaseRichBolt {
   protected HBaseMapper mapper;
 
   /**
-   * The name of the class that should be used as a table provider.
-   *
-   * <p>Defaults to 'org.apache.metron.hbase.HTableProvider'.
+   * Defines when a batch needs flushed.
    */
-  protected String tableProviderClazzName = "org.apache.metron.hbase.HTableProvider";
+  private BatchHelper batchHelper;
 
   /**
-   * The TableProvider
-   * May be loaded from tableProviderClazzName or provided
+   * Establishes a connection to HBase.
    */
-  protected TableProvider tableProvider;
+  private HBaseConnectionFactory connectionFactory;
+
+  /**
+   * Creates the {@link HBaseTableClient} used by this bolt.
+   */
+  private HBaseClientFactory hBaseClientFactory;
+
+  /**
+   * Used to write to HBase.
+   */
+  protected transient HBaseClient hbaseClient;
 
-  private BatchHelper batchHelper;
   protected OutputCollector collector;
-  protected transient LegacyHBaseClient hbaseClient;
 
   public HBaseBolt(String tableName, HBaseMapper mapper) {
     this.tableName = tableName;
     this.mapper = mapper;
+    this.connectionFactory = new HBaseConnectionFactory();
+    this.hBaseClientFactory = new HBaseTableClientFactory();
   }
 
   public HBaseBolt writeToWAL(boolean writeToWAL) {
@@ -102,34 +110,34 @@ public class HBaseBolt extends BaseRichBolt {
     return this;
   }
 
-  public HBaseBolt withTableProvider(String tableProvider) {
-    this.tableProviderClazzName = tableProvider;
+  public HBaseBolt withBatchSize(int batchSize) {
+    this.batchSize = batchSize;
     return this;
   }
 
-  public HBaseBolt withTableProviderInstance(TableProvider tableProvider){
-    this.tableProvider = tableProvider;
+  public HBaseBolt withFlushIntervalSecs(int flushIntervalSecs) {
+    this.flushIntervalSecs = flushIntervalSecs;
     return this;
   }
 
-  public HBaseBolt withBatchSize(int batchSize) {
-    this.batchSize = batchSize;
+  public HBaseBolt withConnectionFactory(HBaseConnectionFactory connectionFactory) {
+    this.connectionFactory = connectionFactory;
     return this;
   }
 
-  public HBaseBolt withFlushIntervalSecs(int flushIntervalSecs) {
-    this.flushIntervalSecs = flushIntervalSecs;
+  public HBaseBolt withConnectionFactory(String connectionFactoryImpl) {
+    this.connectionFactory = HBaseConnectionFactory.byName(connectionFactoryImpl);
     return this;
   }
 
-  public void setClient(LegacyHBaseClient hbaseClient) {
-    this.hbaseClient = hbaseClient;
+  public HBaseBolt withClientFactory(HBaseClientFactory clientFactory) {
+    this.hBaseClientFactory = clientFactory;
+    return this;
   }
 
   @Override
   public Map<String, Object> getComponentConfiguration() {
     LOG.debug("Tick tuples expected every {} second(s)", flushIntervalSecs);
-
     Config conf = new Config();
     conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, flushIntervalSecs);
     return conf;
@@ -139,15 +147,7 @@ public class HBaseBolt extends BaseRichBolt {
   public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
     this.collector = collector;
     this.batchHelper = new BatchHelper(batchSize, collector);
-
-    TableProvider provider;
-    if(this.tableProvider == null) {
-      provider = createTableProvider(tableProviderClazzName);
-    } else {
-      provider = this.tableProvider;
-    }
-
-    hbaseClient = new LegacyHBaseClient(provider, HBaseConfiguration.create(), tableName);
+    this.hbaseClient = hBaseClientFactory.create(connectionFactory, HBaseConfiguration.create(), tableName);
   }
 
   @Override
@@ -158,12 +158,10 @@ public class HBaseBolt extends BaseRichBolt {
   @Override
   public void execute(Tuple tuple) {
     LOG.trace("Received a tuple.");
-
     try {
       if (batchHelper.shouldHandle(tuple)) {
         save(tuple);
       }
-
       if (batchHelper.shouldFlush()) {
         flush();
       }
@@ -182,7 +180,6 @@ public class HBaseBolt extends BaseRichBolt {
     byte[] rowKey = mapper.rowKey(tuple);
     ColumnList cols = mapper.columns(tuple);
     Durability durability = writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL;
-
     Optional<Long> ttl = mapper.getTTL(tuple);
     if(ttl.isPresent()) {
       hbaseClient.addMutation(rowKey, cols, durability, ttl.get());
@@ -199,31 +196,7 @@ public class HBaseBolt extends BaseRichBolt {
    */
   private void flush() {
     LOG.debug("About to flush a batch of {} mutation(s)", batchHelper.getBatchSize());
-
     this.hbaseClient.mutate();
     batchHelper.ack();
   }
-
-  /**
-   * Creates a TableProvider based on a class name.
-   * @param connectorImpl The class name of a TableProvider
-   */
-  private static TableProvider createTableProvider(String connectorImpl) {
-    LOG.trace("Creating table provider; className={}", connectorImpl);
-
-    // if class name not defined, use a reasonable default
-    if(StringUtils.isEmpty(connectorImpl) || connectorImpl.charAt(0) == '$') {
-      return new HTableProvider();
-    }
-
-    // instantiate the table provider
-    try {
-      Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(connectorImpl);
-      return clazz.getConstructor().newInstance();
-
-    } catch (InstantiationException | IllegalAccessException | IllegalStateException |
-              InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) {
-      throw new IllegalStateException("Unable to instantiate connector", e);
-    }
-  }
 }
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
index 9146aff..b5bf898 100644
--- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/HBaseBoltTest.java
@@ -20,13 +20,10 @@
 
 package org.apache.metron.hbase.bolt;
 
-import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.hbase.client.HBaseClient;
+import org.apache.metron.test.bolt.BaseBoltTest;
 import org.apache.storm.Constants;
 import org.apache.storm.tuple.Tuple;
-import org.apache.metron.hbase.bolt.mapper.Widget;
-import org.apache.metron.hbase.bolt.mapper.WidgetMapper;
-import org.apache.metron.hbase.client.LegacyHBaseClient;
-import org.apache.metron.test.bolt.BaseBoltTest;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -48,12 +45,11 @@ import static org.mockito.Mockito.when;
 public class HBaseBoltTest extends BaseBoltTest {
 
   private static final String tableName = "widgets";
-  private LegacyHBaseClient client;
+  private HBaseClient client;
   private Tuple tuple1;
   private Tuple tuple2;
   private Widget widget1;
   private Widget widget2;
-  private TableProvider provider;
 
   @Before
   public void setupTuples() throws Exception {
@@ -68,11 +64,10 @@ public class HBaseBoltTest extends BaseBoltTest {
   }
 
   @Before
-  public void setup() throws Exception {
+  public void setup() {
     tuple1 = mock(Tuple.class);
     tuple2 = mock(Tuple.class);
-    client = mock(LegacyHBaseClient.class);
-    provider = mock(TableProvider.class);
+    client = mock(HBaseClient.class);
   }
 
   /**
@@ -80,9 +75,9 @@ public class HBaseBoltTest extends BaseBoltTest {
    */
   private HBaseBolt createBolt(int batchSize, WidgetMapper mapper) throws IOException {
     HBaseBolt bolt = new HBaseBolt(tableName, mapper)
-            .withBatchSize(batchSize).withTableProviderInstance(provider);
+            .withBatchSize(batchSize)
+            .withClientFactory((f, c, t) -> client);
     bolt.prepare(Collections.emptyMap(), topologyContext, outputCollector);
-    bolt.setClient(client);
     return bolt;
   }
 
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/Widget.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/Widget.java
new file mode 100644
index 0000000..e25d4d5
--- /dev/null
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/Widget.java
@@ -0,0 +1,84 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.hbase.bolt;
+
+/**
+ * A simple POJO used for testing.
+ */
+public class Widget {
+
+  /**
+   * The name of the widget.
+   */
+  private String name;
+
+  /**
+   * The cost of the widget.
+   */
+  private int cost;
+
+  public Widget(String name, int cost) {
+    this.name = name;
+    this.cost = cost;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public int getCost() {
+    return cost;
+  }
+
+  public void setCost(int cost) {
+    this.cost = cost;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    Widget widget = (Widget) o;
+    if (cost != widget.cost) return false;
+    return name != null ? name.equals(widget.name) : widget.name == null;
+
+  }
+
+  @Override
+  public int hashCode() {
+    int result = name != null ? name.hashCode() : 0;
+    result = 31 * result + cost;
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "Widget{" +
+            "name='" + name + '\'' +
+            ", cost=" + cost +
+            '}';
+  }
+}
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/WidgetMapper.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/WidgetMapper.java
new file mode 100644
index 0000000..ac6b561
--- /dev/null
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/hbase/bolt/WidgetMapper.java
@@ -0,0 +1,71 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.metron.hbase.bolt;
+
+import org.apache.metron.hbase.bolt.mapper.HBaseMapper;
+import org.apache.storm.tuple.Tuple;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.metron.hbase.ColumnList;
+
+import java.util.Optional;
+
+
+/**
+ * Maps a Widget to HBase.  Used only for testing.
+ */
+public class WidgetMapper implements HBaseMapper {
+
+  private Optional<Long> ttl;
+
+  public WidgetMapper() {
+    this.ttl = Optional.empty();
+  }
+
+  public WidgetMapper(Long ttl) {
+    this.ttl = Optional.of(ttl);
+  }
+
+  @Override
+  public byte[] rowKey(Tuple tuple) {
+    Widget w = (Widget) tuple.getValueByField("widget");
+    return Bytes.toBytes(w.getName());
+  }
+
+  @Override
+  public ColumnList columns(Tuple tuple) {
+    Widget w = (Widget) tuple.getValueByField("widget");
+
+    ColumnList cols = new ColumnList();
+    cols.addColumn(CF, QNAME, Bytes.toBytes(w.getName()));
+    cols.addColumn(CF, QCOST, Bytes.toBytes(w.getCost()));
+    return cols;
+  }
+
+  @Override
+  public Optional<Long> getTTL(Tuple tuple) {
+    return ttl;
+  }
+
+  public static final String CF_STRING = "cfWidget";
+  public static final byte[] CF = Bytes.toBytes(CF_STRING);
+  public static final byte[] QNAME = Bytes.toBytes("qName");
+  public static final byte[] QCOST = Bytes.toBytes("qCost");
+}
diff --git a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
index ea4ad4e..a1bd458 100644
--- a/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
+++ b/metron-analytics/metron-profiler-storm/src/test/java/org/apache/metron/profiler/storm/integration/ProfilerIntegrationTest.java
@@ -24,14 +24,16 @@ import org.adrianwalker.multilinestring.Multiline;
 import org.apache.commons.io.FileUtils;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.profiler.ProfilerConfig;
-import org.apache.metron.hbase.mock.MockHBaseTableProvider;
-import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.metron.hbase.client.FakeHBaseClient;
+import org.apache.metron.hbase.client.FakeHBaseClientFactory;
+import org.apache.metron.hbase.client.FakeHBaseConnectionFactory;
 import org.apache.metron.integration.BaseIntegrationTest;
 import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.UnableToStartException;
 import org.apache.metron.integration.components.FluxTopologyComponent;
 import org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
+import org.apache.metron.profiler.client.HBaseProfilerClientFactory;
 import org.apache.metron.profiler.client.stellar.FixedLookback;
 import org.apache.metron.profiler.client.stellar.GetProfile;
 import org.apache.metron.profiler.client.stellar.WindowLookback;
@@ -39,6 +41,7 @@ import org.apache.metron.statistics.OnlineStatisticsProvider;
 import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
 import org.apache.metron.stellar.common.StellarStatefulExecutor;
 import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
 import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
 import org.apache.storm.Config;
 import org.json.simple.JSONObject;
@@ -64,7 +67,6 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.metron.integration.utils.TestUtils.assertEventually;
 import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_COLUMN_FAMILY;
 import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE;
-import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_HBASE_TABLE_PROVIDER;
 import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD;
 import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_PERIOD_UNITS;
 import static org.apache.metron.profiler.client.stellar.ProfilerClientConfig.PROFILER_SALT_DIVISOR;
@@ -109,7 +111,6 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
   private static KafkaComponent kafkaComponent;
   private static ConfigUploadComponent configUploadComponent;
   private static ComponentRunner runner;
-  private static MockHTable profilerTable;
   private static String message1;
   private static String message2;
   private static String message3;
@@ -393,7 +394,6 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
 
   @BeforeClass
   public static void setupBeforeClass() throws UnableToStartException {
-
     // create some messages that contain a timestamp - a really old timestamp; close to 1970
     message1 = getMessage(entity, startAt);
     message2 = getMessage(entity, startAt + 100);
@@ -428,7 +428,8 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
       setProperty("profiler.hbase.column.family", columnFamily);
       setProperty("profiler.hbase.batch", "10");
       setProperty("profiler.hbase.flush.interval.seconds", "1");
-      setProperty("hbase.provider.impl", "" + MockHBaseTableProvider.class.getName());
+      setProperty("profiler.hbase.connection.factory", FakeHBaseConnectionFactory.class.getName());
+      setProperty("profiler.hbase.client.factory", FakeHBaseClientFactory.class.getName());
 
       // profile settings
       setProperty("profiler.period.duration", Long.toString(periodDurationMillis));
@@ -442,9 +443,6 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
       setProperty("profiler.max.routes.per.bolt", Long.toString(maxRoutesPerBolt));
     }};
 
-    // create the mock table
-    profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, columnFamily);
-
     zkComponent = getZKServerComponent(topologyProperties);
 
     // create the input and output topics
@@ -477,8 +475,7 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
   }
 
   @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    MockHBaseTableProvider.clear();
+  public static void tearDownAfterClass() {
     if (runner != null) {
       runner.stop();
     }
@@ -486,14 +483,10 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
 
   @Before
   public void setup() {
-    // create the mock table
-    profilerTable = (MockHTable) MockHBaseTableProvider.addToCache(tableName, columnFamily);
-
     // global properties
     Map<String, Object> global = new HashMap<String, Object>() {{
       put(PROFILER_HBASE_TABLE.getKey(), tableName);
       put(PROFILER_COLUMN_FAMILY.getKey(), columnFamily);
-      put(PROFILER_HBASE_TABLE_PROVIDER.getKey(), MockHBaseTableProvider.class.getName());
 
       // client needs to use the same period duration
       put(PROFILER_PERIOD.getKey(), Long.toString(periodDurationMillis));
@@ -503,21 +496,28 @@ public class ProfilerIntegrationTest extends BaseIntegrationTest {
       put(PROFILER_SALT_DIVISOR.getKey(), saltDivisor);
     }};
 
+    Context context = new Context.Builder()
+            .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
+            .build();
+
+    // create the GET_PROFILE function
+    GetProfile getProfileFunction = new GetProfile(new HBaseProfilerClientFactory(new FakeHBaseClientFactory()));
+
+    // ensure the functions that we need can be resolved
+    FunctionResolver functionResolver = new SimpleFunctionResolver()
+            .withClass(FixedLookback.class)
+            .withClass(WindowLookback.class)
+            .withInstance(getProfileFunction);
+
     // create the stellar execution environment
-    executor = new DefaultStellarStatefulExecutor(
-            new SimpleFunctionResolver()
-                    .withClass(GetProfile.class)
-                    .withClass(FixedLookback.class)
-                    .withClass(WindowLookback.class),
-            new Context.Builder()
-                    .with(Context.Capabilities.GLOBAL_CONFIG, () -> global)
-                    .build());
+    executor = new DefaultStellarStatefulExecutor(functionResolver, context);
+
+    // ensure that all HBase "records" are cleared before starting the test
+    new FakeHBaseClient().deleteAll();
   }
 
   @After
   public void tearDown() throws Exception {
-    MockHBaseTableProvider.clear();
-    profilerTable.clear();
     if (runner != null) {
       runner.reset();
     }
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2 b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2
index d8bc13d..d89dcab 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/profiler.properties.j2
@@ -61,6 +61,8 @@ profiler.hbase.table={{profiler_hbase_table}}
 profiler.hbase.column.family={{profiler_hbase_cf}}
 profiler.hbase.batch={{profiler_hbase_batch}}
 profiler.hbase.flush.interval.seconds={{profiler_hbase_flush_interval}}
+profiler.hbase.client.factory=org.apache.metron.hbase.client.HBaseTableClientFactory
+profiler.hbase.connection.factory=org.apache.metron.hbase.client.HBaseConnectionFactory
 
 ##### Kafka #####
 
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClient.java b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClient.java
index 60d2328..4591f9e 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClient.java
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/main/java/org/apache/metron/hbase/client/HBaseTableClient.java
@@ -82,7 +82,7 @@ public class HBaseTableClient implements HBaseClient {
         connection.close();
       }
     } catch(IOException e) {
-      LOG.error("Error while closing HBase connection",e);
+      LOG.error("Error while closing HBase connection", e);
     }
   }
 
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/StellarFunctionInfo.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/StellarFunctionInfo.java
index 8606723..042706d 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/StellarFunctionInfo.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/StellarFunctionInfo.java
@@ -77,6 +77,11 @@ public class StellarFunctionInfo {
     return function;
   }
 
+  public StellarFunctionInfo setFunction(StellarFunction function) {
+    this.function = function;
+    return this;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
@@ -90,7 +95,6 @@ public class StellarFunctionInfo {
     // Probably incorrect - comparing Object[] arrays with Arrays.equals
     if (!Arrays.equals(params, that.params)) return false;
     return function != null ? function.equals(that.function) : that.function == null;
-
   }
 
   @Override
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/BaseFunctionResolver.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/BaseFunctionResolver.java
index 38a32d1..6fd669b 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/BaseFunctionResolver.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/BaseFunctionResolver.java
@@ -52,7 +52,7 @@ public abstract class BaseFunctionResolver implements FunctionResolver, Serializ
   /**
    * Maps a function name to the metadata necessary to execute the Stellar function.
    */
-  private Supplier<Map<String, StellarFunctionInfo>> functions;
+  protected Supplier<Map<String, StellarFunctionInfo>> functions;
 
   /**
    * The Stellar execution context that can be used to inform the function resolution process.
@@ -145,6 +145,7 @@ public abstract class BaseFunctionResolver implements FunctionResolver, Serializ
    */
   @Override
   public StellarFunction apply(String functionName) {
+    LOG.debug("Resolving function; functionName={}", functionName);
     StellarFunctionInfo info = functions.get().get(functionName);
     if(info == null) {
       throw new IllegalStateException(format("Unknown function: `%s`", functionName));
@@ -156,7 +157,6 @@ public abstract class BaseFunctionResolver implements FunctionResolver, Serializ
    * Performs the core process of function resolution.
    */
   protected Map<String, StellarFunctionInfo> resolveFunctions() {
-
     // maps a function name to its definition
     Map<String, StellarFunctionInfo> functions = new HashMap<>();
 
@@ -242,4 +242,18 @@ public abstract class BaseFunctionResolver implements FunctionResolver, Serializ
       return null;
     }
   }
+
+  @Override
+  public BaseFunctionResolver withInstance(StellarFunction function) {
+    // perform function resolution on the instance that was passed in
+    StellarFunctionInfo functionInfo = resolveFunction(function.getClass());
+    functionInfo.setFunction(function);
+
+    // add the function to the set of resolvable functions
+    Map<String, StellarFunctionInfo> currentFunctions = this.functions.get();
+    currentFunctions.put(functionInfo.getName(), functionInfo);
+
+    this.functions = Suppliers.memoize(() -> currentFunctions);
+    return this;
+  }
 }
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/FunctionResolver.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/FunctionResolver.java
index 4047586..38eb067 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/FunctionResolver.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/FunctionResolver.java
@@ -17,13 +17,14 @@
  */
 package org.apache.metron.stellar.dsl.functions.resolver;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.function.Function;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.StellarFunction;
 import org.apache.metron.stellar.dsl.StellarFunctionInfo;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.function.Function;
+
 /**
  * Responsible for function resolution in Stellar.
  */
@@ -46,6 +47,16 @@ public interface FunctionResolver extends Function<String, StellarFunction>, Clo
   void initialize(Context context);
 
   /**
+   * Attempts to resolve a function defined within the provided {@link StellarFunction}
+   * instance.
+   *
+   * <p>This can be useful for instrumenting a Stellar function before it is tested.
+   *
+   * @param function The Stellar function to resolve.
+   */
+  FunctionResolver withInstance(StellarFunction function);
+
+    /**
    * Perform any cleanup necessary for the loaded Stellar functions.
    */
   @Override
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/SimpleFunctionResolver.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/SimpleFunctionResolver.java
index d2d0e62..a5a5b0c 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/SimpleFunctionResolver.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/dsl/functions/resolver/SimpleFunctionResolver.java
@@ -18,13 +18,17 @@
 
 package org.apache.metron.stellar.dsl.functions.resolver;
 
-import java.lang.invoke.MethodHandles;
-import java.util.HashSet;
-import java.util.Set;
+import com.google.common.base.Suppliers;
 import org.apache.metron.stellar.dsl.StellarFunction;
+import org.apache.metron.stellar.dsl.StellarFunctionInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.invoke.MethodHandles;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 /**
  * A simple Stellar function resolver that resolves functions from specific
  * classes rather than by searching the classpath.
@@ -49,7 +53,8 @@ public class SimpleFunctionResolver extends BaseFunctionResolver {
   }
 
   /**
-   * Will attempt to resolve any Stellar functions defined within the specified class.
+   * Attempts to resolve any functions defined within a specific class.
+   *
    * @param clazz The class which may contain a Stellar function.
    */
   public SimpleFunctionResolver withClass(Class<? extends StellarFunction> clazz) {