You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/06/12 06:32:15 UTC

[nifi] branch master updated: NIFI-6243 Add Support for AtomicDistributedCache to the HBase 1.x and 2.x Map Cache Services

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new e913f57  NIFI-6243 Add Support for AtomicDistributedCache to the HBase 1.x and 2.x Map Cache Services
e913f57 is described below

commit e913f5706f3b3a0a2e98cf77ac4c96f63ba4f104
Author: Shawn Weeks <sw...@weeksconsulting.us>
AuthorDate: Sat May 4 08:58:00 2019 -0500

    NIFI-6243 Add Support for AtomicDistributedCache to the HBase 1.x and 2.x Map Cache Services
    
    This closes #3462.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../hbase/HBase_1_1_2_ClientMapCacheService.java   | 35 +++++++++-
 .../apache/nifi/hbase/MockHBaseClientService.java  | 14 ++--
 .../TestHBase_1_1_2_ClientMapCacheService.java     | 77 +++++++++++++++++++++-
 .../nifi/hbase/HBase_2_ClientMapCacheService.java  | 35 +++++++++-
 .../apache/nifi/hbase/MockHBaseClientService.java  | 14 ++--
 .../hbase/TestHBase_2_ClientMapCacheService.java   | 77 +++++++++++++++++++++-
 6 files changed, 232 insertions(+), 20 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
old mode 100644
new mode 100755
index 3991584..0690db6
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
@@ -30,7 +30,8 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.distributed.cache.client.Deserializer;
@@ -52,7 +53,7 @@ import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
 @CapabilityDescription("Provides the ability to use an HBase table as a cache, in place of a DistributedMapCache."
     + " Uses a HBase_1_1_2_ClientService controller to communicate with HBase.")
 
-public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService implements DistributedMapCacheClient {
+public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService implements AtomicDistributedMapCacheClient<byte[]> {
 
     static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
         .name("HBase Client Service")
@@ -229,6 +230,36 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
     protected void finalize() throws Throwable {
     }
 
+    @Override
+    public <K, V> AtomicCacheEntry<K, V, byte[]> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+        final byte[] rowIdBytes = serialize(key, keySerializer);
+        final HBaseRowHandler handler = new HBaseRowHandler();
+
+        final List<Column> columnsList = new ArrayList<>(1);
+        columnsList.add(new Column(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes));
+
+        hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, authorizations, handler);
+
+        if (handler.numRows() > 1) {
+            throw new IOException("Found multiple rows in HBase for key");
+        } else if (handler.numRows() == 1) {
+            return new AtomicCacheEntry<>(key, deserialize(handler.getLastResultBytes(), valueDeserializer), handler.getLastResultBytes());
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public <K, V> boolean replace(AtomicCacheEntry<K, V, byte[]> entry, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+        final byte[] rowIdBytes = serialize(entry.getKey(), keySerializer);
+        final byte[] valueBytes = serialize(entry.getValue(), valueSerializer);
+        final byte[] revision = entry.getRevision().orElse(null);
+        final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes);
+
+        // If the current revision is unset then only insert the row if it doesn't already exist.
+        return hBaseClientService.checkAndPut(hBaseCacheTableName, rowIdBytes, hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, revision, putColumn);
+    }
+
     private class HBaseRowHandler implements ResultHandler {
         private int numRows = 0;
         private byte[] lastResultBytes;
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index 917cb3b..ea805d1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -48,7 +48,7 @@ public class MockHBaseClientService extends HBase_1_1_2_ClientService {
 
     private Table table;
     private String family;
-    private List<Result> results = new ArrayList<>();
+    private Map<String, Result> results = new HashMap<>();
     private KerberosProperties kerberosProperties;
 
     public MockHBaseClientService(final Table table, final String family, final KerberosProperties kerberosProperties) {
@@ -102,7 +102,7 @@ public class MockHBaseClientService extends HBase_1_1_2_ClientService {
         final Result result = Mockito.mock(Result.class);
         when(result.getRow()).thenReturn(rowArray);
         when(result.rawCells()).thenReturn(cellArray);
-        results.add(result);
+        results.put(rowKey, result);
     }
 
     @Override
@@ -123,12 +123,12 @@ public class MockHBaseClientService extends HBase_1_1_2_ClientService {
 
     @Override
     public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException {
-        for (Result result : results) {
+        for (Result result : results.values()) {
             if (Arrays.equals(result.getRow(), rowId)) {
                 Cell[] cellArray = result.rawCells();
                 for (Cell cell : cellArray) {
                     if (Arrays.equals(cell.getFamilyArray(), family) && Arrays.equals(cell.getQualifierArray(), qualifier)) {
-                         if (value == null || Arrays.equals(cell.getValueArray(), value)) {
+                         if (value == null || !Arrays.equals(cell.getValueArray(), value)) {
                              return false;
                          }
                     }
@@ -144,21 +144,21 @@ public class MockHBaseClientService extends HBase_1_1_2_ClientService {
 
     protected ResultScanner getResults(Table table, byte[] startRow, byte[] endRow, Collection<Column> columns, List<String> labels) throws IOException {
         final ResultScanner scanner = Mockito.mock(ResultScanner.class);
-        Mockito.when(scanner.iterator()).thenReturn(results.iterator());
+        Mockito.when(scanner.iterator()).thenReturn(results.values().iterator());
         return scanner;
     }
 
     @Override
     protected ResultScanner getResults(Table table, Collection<Column> columns, Filter filter, long minTime, List<String> labels) throws IOException {
         final ResultScanner scanner = Mockito.mock(ResultScanner.class);
-        Mockito.when(scanner.iterator()).thenReturn(results.iterator());
+        Mockito.when(scanner.iterator()).thenReturn(results.values().iterator());
         return scanner;
     }
 
     protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax,
             final Integer limitRows, final Boolean isReversed, final Collection<Column> columns)  throws IOException {
         final ResultScanner scanner = Mockito.mock(ResultScanner.class);
-        Mockito.when(scanner.iterator()).thenReturn(results.iterator());
+        Mockito.when(scanner.iterator()).thenReturn(results.values().iterator());
         return scanner;
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java
index 06848f9..41e4a1c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java
@@ -20,6 +20,8 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Serializer;
@@ -43,6 +45,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -250,6 +253,78 @@ public class TestHBase_1_1_2_ClientMapCacheService {
     }
 
 
+    @Test
+    public void testFetch() throws InitializationException, IOException {
+        final String key = "key1";
+        final String value = "value1";
+        final byte[] revision = value.getBytes();
+
+        final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+
+        // Mock an HBase Table so we can verify the put operations later
+        final Table table = Mockito.mock(Table.class);
+        when(table.getName()).thenReturn(TableName.valueOf(tableName));
+
+        // create the controller service and link it to the test processor
+        final MockHBaseClientService service = configureHBaseClientService(runner, table);
+        runner.assertValid(service);
+
+        final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
+                .asControllerService(HBaseClientService.class);
+
+        final AtomicDistributedMapCacheClient<byte[]> cacheService = configureHBaseCacheService(runner, hBaseClientService);
+        runner.assertValid(cacheService);
+
+        final AtomicDistributedMapCacheClient<byte[]> hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
+                .asControllerService(AtomicDistributedMapCacheClient.class);
+
+        hBaseCacheService.put(key, value, stringSerializer, stringSerializer);
+
+        final AtomicCacheEntry<String, String, byte[]> atomicCacheEntry = hBaseCacheService.fetch(key, stringSerializer, stringDeserializer);
+
+        assertEquals(key, atomicCacheEntry.getKey());
+        assertEquals(value, atomicCacheEntry.getValue());
+        assertArrayEquals(revision, atomicCacheEntry.getRevision().get());
+    }
+
+    @Test
+    public void testReplace() throws InitializationException, IOException {
+        final String key = "key1";
+        final String value = "value1";
+        final byte[] revision = value.getBytes();
+
+        final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+
+        // Mock an HBase Table so we can verify the put operations later
+        final Table table = Mockito.mock(Table.class);
+        when(table.getName()).thenReturn(TableName.valueOf(tableName));
+
+        // create the controller service and link it to the test processor
+        final MockHBaseClientService service = configureHBaseClientService(runner, table);
+        runner.assertValid(service);
+
+        final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
+                .asControllerService(HBaseClientService.class);
+
+        final AtomicDistributedMapCacheClient<byte[]> cacheService = configureHBaseCacheService(runner, hBaseClientService);
+        runner.assertValid(cacheService);
+
+        final AtomicDistributedMapCacheClient<byte[]> hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
+                .asControllerService(AtomicDistributedMapCacheClient.class);
+
+        // First time value should not already be in cache so this should return true
+        final boolean newResult = hBaseCacheService.replace(new AtomicCacheEntry(key,value,null), stringSerializer, stringSerializer);
+        assertTrue(newResult);
+
+        // Second time value is already in cache so this should return false
+        final boolean existingResult = hBaseCacheService.replace(new AtomicCacheEntry(key,value,revision), stringSerializer, stringSerializer);
+        assertTrue(existingResult);
+
+        // Third time we're replacing with a new value so this should return true
+        final boolean replaceResult = hBaseCacheService.replace(new AtomicCacheEntry(key,"value2",revision), stringSerializer, stringSerializer);
+        assertTrue(replaceResult);
+    }
+
     private MockHBaseClientService configureHBaseClientService(final TestRunner runner, final Table table) throws InitializationException {
         final MockHBaseClientService service = new MockHBaseClientService(table, "family1", kerberosPropsWithFile);
         runner.addControllerService("hbaseClient", service);
@@ -259,7 +334,7 @@ public class TestHBase_1_1_2_ClientMapCacheService {
         return service;
     }
 
-    private DistributedMapCacheClient configureHBaseCacheService(final TestRunner runner, final HBaseClientService service) throws InitializationException {
+    private AtomicDistributedMapCacheClient<byte[]> configureHBaseCacheService(final TestRunner runner, final HBaseClientService service) throws InitializationException {
         final HBase_1_1_2_ClientMapCacheService cacheService = new HBase_1_1_2_ClientMapCacheService();
         runner.addControllerService("hbaseCache", cacheService);
         runner.setProperty(cacheService, HBase_1_1_2_ClientMapCacheService.HBASE_CLIENT_SERVICE, "hbaseClient");
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java
old mode 100644
new mode 100755
index e6ad650..cd3b684
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java
@@ -30,7 +30,8 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.distributed.cache.client.Deserializer;
@@ -52,7 +53,7 @@ import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
 @CapabilityDescription("Provides the ability to use an HBase table as a cache, in place of a DistributedMapCache."
     + " Uses a HBase_2_ClientService controller to communicate with HBase.")
 
-public class HBase_2_ClientMapCacheService extends AbstractControllerService implements DistributedMapCacheClient {
+public class HBase_2_ClientMapCacheService extends AbstractControllerService implements AtomicDistributedMapCacheClient<byte[]> {
 
     static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
         .name("HBase Client Service")
@@ -229,6 +230,36 @@ public class HBase_2_ClientMapCacheService extends AbstractControllerService imp
     protected void finalize() throws Throwable {
     }
 
+    @Override
+    public <K, V> AtomicCacheEntry<K, V, byte[]> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+        final byte[] rowIdBytes = serialize(key, keySerializer);
+        final HBaseRowHandler handler = new HBaseRowHandler();
+
+        final List<Column> columnsList = new ArrayList<>(1);
+        columnsList.add(new Column(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes));
+
+        hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, authorizations, handler);
+
+        if (handler.numRows() > 1) {
+            throw new IOException("Found multiple rows in HBase for key");
+        } else if (handler.numRows() == 1) {
+            return new AtomicCacheEntry<>(key, deserialize(handler.getLastResultBytes(), valueDeserializer), handler.getLastResultBytes());
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public <K, V> boolean replace(AtomicCacheEntry<K, V, byte[]> entry, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+        final byte[] rowIdBytes = serialize(entry.getKey(), keySerializer);
+        final byte[] valueBytes = serialize(entry.getValue(), valueSerializer);
+        final byte[] revision = entry.getRevision().orElse(null);
+        final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes);
+
+        // If the current revision is unset then only insert the row if it doesn't already exist.
+        return hBaseClientService.checkAndPut(hBaseCacheTableName, rowIdBytes, hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, revision, putColumn);
+    }
+
     private class HBaseRowHandler implements ResultHandler {
         private int numRows = 0;
         private byte[] lastResultBytes;
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index a30d465..19b0577 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -48,7 +48,7 @@ public class MockHBaseClientService extends HBase_2_ClientService {
 
     private Table table;
     private String family;
-    private List<Result> results = new ArrayList<>();
+    private Map<String, Result> results = new HashMap<>();
     private KerberosProperties kerberosProperties;
 
     public MockHBaseClientService(final Table table, final String family, final KerberosProperties kerberosProperties) {
@@ -102,7 +102,7 @@ public class MockHBaseClientService extends HBase_2_ClientService {
         final Result result = Mockito.mock(Result.class);
         when(result.getRow()).thenReturn(rowArray);
         when(result.rawCells()).thenReturn(cellArray);
-        results.add(result);
+        results.put(rowKey, result);
     }
 
     @Override
@@ -123,12 +123,12 @@ public class MockHBaseClientService extends HBase_2_ClientService {
 
     @Override
     public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException {
-        for (Result result : results) {
+        for (Result result : results.values()) {
             if (Arrays.equals(result.getRow(), rowId)) {
                 Cell[] cellArray = result.rawCells();
                 for (Cell cell : cellArray) {
                     if (Arrays.equals(cell.getFamilyArray(), family) && Arrays.equals(cell.getQualifierArray(), qualifier)) {
-                         if (value == null || Arrays.equals(cell.getValueArray(), value)) {
+                         if (value == null || !Arrays.equals(cell.getValueArray(), value)) {
                              return false;
                          }
                     }
@@ -144,21 +144,21 @@ public class MockHBaseClientService extends HBase_2_ClientService {
 
     protected ResultScanner getResults(Table table, byte[] startRow, byte[] endRow, Collection<Column> columns, List<String> labels) throws IOException {
         final ResultScanner scanner = Mockito.mock(ResultScanner.class);
-        Mockito.when(scanner.iterator()).thenReturn(results.iterator());
+        Mockito.when(scanner.iterator()).thenReturn(results.values().iterator());
         return scanner;
     }
 
     @Override
     protected ResultScanner getResults(Table table, Collection<Column> columns, Filter filter, long minTime, List<String> labels) throws IOException {
         final ResultScanner scanner = Mockito.mock(ResultScanner.class);
-        Mockito.when(scanner.iterator()).thenReturn(results.iterator());
+        Mockito.when(scanner.iterator()).thenReturn(results.values().iterator());
         return scanner;
     }
 
     protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax,
             final Integer limitRows, final Boolean isReversed, final Collection<Column> columns)  throws IOException {
         final ResultScanner scanner = Mockito.mock(ResultScanner.class);
-        Mockito.when(scanner.iterator()).thenReturn(results.iterator());
+        Mockito.when(scanner.iterator()).thenReturn(results.values().iterator());
         return scanner;
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java
index eb3fb45..cacefb6 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java
@@ -20,6 +20,8 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Serializer;
@@ -43,6 +45,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -249,6 +252,78 @@ public class TestHBase_2_ClientMapCacheService {
         assertEquals( result, content);
     }
 
+    @Test
+    public void testFetch() throws InitializationException, IOException {
+        final String key = "key1";
+        final String value = "value1";
+        final byte[] revision = value.getBytes();
+
+        final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+
+        // Mock an HBase Table so we can verify the put operations later
+        final Table table = Mockito.mock(Table.class);
+        when(table.getName()).thenReturn(TableName.valueOf(tableName));
+
+        // create the controller service and link it to the test processor
+        final MockHBaseClientService service = configureHBaseClientService(runner, table);
+        runner.assertValid(service);
+
+        final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
+                .asControllerService(HBaseClientService.class);
+
+        final AtomicDistributedMapCacheClient<byte[]> cacheService = configureHBaseCacheService(runner, hBaseClientService);
+        runner.assertValid(cacheService);
+
+        final AtomicDistributedMapCacheClient<byte[]> hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
+                .asControllerService(AtomicDistributedMapCacheClient.class);
+
+        hBaseCacheService.put(key, value, stringSerializer, stringSerializer);
+
+        final AtomicCacheEntry<String, String, byte[]> atomicCacheEntry = hBaseCacheService.fetch(key, stringSerializer, stringDeserializer);
+
+        assertEquals(key, atomicCacheEntry.getKey());
+        assertEquals(value, atomicCacheEntry.getValue());
+        assertArrayEquals(revision, atomicCacheEntry.getRevision().get());
+    }
+
+    @Test
+    public void testReplace() throws InitializationException, IOException {
+        final String key = "key1";
+        final String value = "value1";
+        final byte[] revision = value.getBytes();
+
+        final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+
+        // Mock an HBase Table so we can verify the put operations later
+        final Table table = Mockito.mock(Table.class);
+        when(table.getName()).thenReturn(TableName.valueOf(tableName));
+
+        // create the controller service and link it to the test processor
+        final MockHBaseClientService service = configureHBaseClientService(runner, table);
+        runner.assertValid(service);
+
+        final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
+                .asControllerService(HBaseClientService.class);
+
+        final AtomicDistributedMapCacheClient<byte[]> cacheService = configureHBaseCacheService(runner, hBaseClientService);
+        runner.assertValid(cacheService);
+
+        final AtomicDistributedMapCacheClient<byte[]> hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
+                .asControllerService(AtomicDistributedMapCacheClient.class);
+
+        // First time value should not already be in cache so this should return true
+        final boolean newResult = hBaseCacheService.replace(new AtomicCacheEntry(key,value,null), stringSerializer, stringSerializer);
+        assertTrue(newResult);
+
+        // Second time value is already in cache so this should return false
+        final boolean existingResult = hBaseCacheService.replace(new AtomicCacheEntry(key,value,revision), stringSerializer, stringSerializer);
+        assertTrue(existingResult);
+
+        // Third time we're replacing with a new value so this should return true
+        final boolean replaceResult = hBaseCacheService.replace(new AtomicCacheEntry(key,"value2",revision), stringSerializer, stringSerializer);
+        assertTrue(replaceResult);
+    }
+
 
     private MockHBaseClientService configureHBaseClientService(final TestRunner runner, final Table table) throws InitializationException {
         final MockHBaseClientService service = new MockHBaseClientService(table, "family1", kerberosPropsWithFile);
@@ -259,7 +334,7 @@ public class TestHBase_2_ClientMapCacheService {
         return service;
     }
 
-    private DistributedMapCacheClient configureHBaseCacheService(final TestRunner runner, final HBaseClientService service) throws InitializationException {
+    private AtomicDistributedMapCacheClient<byte[]> configureHBaseCacheService(final TestRunner runner, final HBaseClientService service) throws InitializationException {
         final HBase_2_ClientMapCacheService cacheService = new HBase_2_ClientMapCacheService();
         runner.addControllerService("hbaseCache", cacheService);
         runner.setProperty(cacheService, HBase_2_ClientMapCacheService.HBASE_CLIENT_SERVICE, "hbaseClient");