You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/06/12 06:32:15 UTC
[nifi] branch master updated: NIFI-6243 Add Support for
AtomicDistributedCache to the HBase 1.x and 2.x Map Cache Services
This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new e913f57 NIFI-6243 Add Support for AtomicDistributedCache to the HBase 1.x and 2.x Map Cache Services
e913f57 is described below
commit e913f5706f3b3a0a2e98cf77ac4c96f63ba4f104
Author: Shawn Weeks <sw...@weeksconsulting.us>
AuthorDate: Sat May 4 08:58:00 2019 -0500
NIFI-6243 Add Support for AtomicDistributedCache to the HBase 1.x and 2.x Map Cache Services
This closes #3462.
Signed-off-by: Koji Kawamura <ij...@apache.org>
---
.../hbase/HBase_1_1_2_ClientMapCacheService.java | 35 +++++++++-
.../apache/nifi/hbase/MockHBaseClientService.java | 14 ++--
.../TestHBase_1_1_2_ClientMapCacheService.java | 77 +++++++++++++++++++++-
.../nifi/hbase/HBase_2_ClientMapCacheService.java | 35 +++++++++-
.../apache/nifi/hbase/MockHBaseClientService.java | 14 ++--
.../hbase/TestHBase_2_ClientMapCacheService.java | 77 +++++++++++++++++++++-
6 files changed, 232 insertions(+), 20 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
old mode 100644
new mode 100755
index 3991584..0690db6
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientMapCacheService.java
@@ -30,7 +30,8 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.distributed.cache.client.Deserializer;
@@ -52,7 +53,7 @@ import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
@CapabilityDescription("Provides the ability to use an HBase table as a cache, in place of a DistributedMapCache."
+ " Uses a HBase_1_1_2_ClientService controller to communicate with HBase.")
-public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService implements DistributedMapCacheClient {
+public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService implements AtomicDistributedMapCacheClient<byte[]> {
static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Client Service")
@@ -229,6 +230,36 @@ public class HBase_1_1_2_ClientMapCacheService extends AbstractControllerService
protected void finalize() throws Throwable {
}
+ @Override
+ public <K, V> AtomicCacheEntry<K, V, byte[]> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+ final byte[] rowIdBytes = serialize(key, keySerializer);
+ final HBaseRowHandler handler = new HBaseRowHandler();
+
+ final List<Column> columnsList = new ArrayList<>(1);
+ columnsList.add(new Column(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes));
+
+ hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, authorizations, handler);
+
+ if (handler.numRows() > 1) {
+ throw new IOException("Found multiple rows in HBase for key");
+ } else if (handler.numRows() == 1) {
+ return new AtomicCacheEntry<>(key, deserialize(handler.getLastResultBytes(), valueDeserializer), handler.getLastResultBytes());
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public <K, V> boolean replace(AtomicCacheEntry<K, V, byte[]> entry, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+ final byte[] rowIdBytes = serialize(entry.getKey(), keySerializer);
+ final byte[] valueBytes = serialize(entry.getValue(), valueSerializer);
+ final byte[] revision = entry.getRevision().orElse(null);
+ final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes);
+
+ // If the current revision is unset then only insert the row if it doesn't already exist.
+ return hBaseClientService.checkAndPut(hBaseCacheTableName, rowIdBytes, hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, revision, putColumn);
+ }
+
private class HBaseRowHandler implements ResultHandler {
private int numRows = 0;
private byte[] lastResultBytes;
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index 917cb3b..ea805d1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -48,7 +48,7 @@ public class MockHBaseClientService extends HBase_1_1_2_ClientService {
private Table table;
private String family;
- private List<Result> results = new ArrayList<>();
+ private Map<String, Result> results = new HashMap<>();
private KerberosProperties kerberosProperties;
public MockHBaseClientService(final Table table, final String family, final KerberosProperties kerberosProperties) {
@@ -102,7 +102,7 @@ public class MockHBaseClientService extends HBase_1_1_2_ClientService {
final Result result = Mockito.mock(Result.class);
when(result.getRow()).thenReturn(rowArray);
when(result.rawCells()).thenReturn(cellArray);
- results.add(result);
+ results.put(rowKey, result);
}
@Override
@@ -123,12 +123,12 @@ public class MockHBaseClientService extends HBase_1_1_2_ClientService {
@Override
public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException {
- for (Result result : results) {
+ for (Result result : results.values()) {
if (Arrays.equals(result.getRow(), rowId)) {
Cell[] cellArray = result.rawCells();
for (Cell cell : cellArray) {
if (Arrays.equals(cell.getFamilyArray(), family) && Arrays.equals(cell.getQualifierArray(), qualifier)) {
- if (value == null || Arrays.equals(cell.getValueArray(), value)) {
+ if (value == null || !Arrays.equals(cell.getValueArray(), value)) {
return false;
}
}
@@ -144,21 +144,21 @@ public class MockHBaseClientService extends HBase_1_1_2_ClientService {
protected ResultScanner getResults(Table table, byte[] startRow, byte[] endRow, Collection<Column> columns, List<String> labels) throws IOException {
final ResultScanner scanner = Mockito.mock(ResultScanner.class);
- Mockito.when(scanner.iterator()).thenReturn(results.iterator());
+ Mockito.when(scanner.iterator()).thenReturn(results.values().iterator());
return scanner;
}
@Override
protected ResultScanner getResults(Table table, Collection<Column> columns, Filter filter, long minTime, List<String> labels) throws IOException {
final ResultScanner scanner = Mockito.mock(ResultScanner.class);
- Mockito.when(scanner.iterator()).thenReturn(results.iterator());
+ Mockito.when(scanner.iterator()).thenReturn(results.values().iterator());
return scanner;
}
protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax,
final Integer limitRows, final Boolean isReversed, final Collection<Column> columns) throws IOException {
final ResultScanner scanner = Mockito.mock(ResultScanner.class);
- Mockito.when(scanner.iterator()).thenReturn(results.iterator());
+ Mockito.when(scanner.iterator()).thenReturn(results.values().iterator());
return scanner;
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java
index 06848f9..41e4a1c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_1_1_2_ClientMapCacheService.java
@@ -20,6 +20,8 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
@@ -43,6 +45,7 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -250,6 +253,78 @@ public class TestHBase_1_1_2_ClientMapCacheService {
}
+ @Test
+ public void testFetch() throws InitializationException, IOException {
+ final String key = "key1";
+ final String value = "value1";
+ final byte[] revision = value.getBytes();
+
+ final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+
+ // Mock an HBase Table so we can verify the put operations later
+ final Table table = Mockito.mock(Table.class);
+ when(table.getName()).thenReturn(TableName.valueOf(tableName));
+
+ // create the controller service and link it to the test processor
+ final MockHBaseClientService service = configureHBaseClientService(runner, table);
+ runner.assertValid(service);
+
+ final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
+ .asControllerService(HBaseClientService.class);
+
+ final AtomicDistributedMapCacheClient<byte[]> cacheService = configureHBaseCacheService(runner, hBaseClientService);
+ runner.assertValid(cacheService);
+
+ final AtomicDistributedMapCacheClient<byte[]> hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
+ .asControllerService(AtomicDistributedMapCacheClient.class);
+
+ hBaseCacheService.put(key, value, stringSerializer, stringSerializer);
+
+ final AtomicCacheEntry<String, String, byte[]> atomicCacheEntry = hBaseCacheService.fetch(key, stringSerializer, stringDeserializer);
+
+ assertEquals(key, atomicCacheEntry.getKey());
+ assertEquals(value, atomicCacheEntry.getValue());
+ assertArrayEquals(revision, atomicCacheEntry.getRevision().get());
+ }
+
+ @Test
+ public void testReplace() throws InitializationException, IOException {
+ final String key = "key1";
+ final String value = "value1";
+ final byte[] revision = value.getBytes();
+
+ final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+
+ // Mock an HBase Table so we can verify the put operations later
+ final Table table = Mockito.mock(Table.class);
+ when(table.getName()).thenReturn(TableName.valueOf(tableName));
+
+ // create the controller service and link it to the test processor
+ final MockHBaseClientService service = configureHBaseClientService(runner, table);
+ runner.assertValid(service);
+
+ final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
+ .asControllerService(HBaseClientService.class);
+
+ final AtomicDistributedMapCacheClient<byte[]> cacheService = configureHBaseCacheService(runner, hBaseClientService);
+ runner.assertValid(cacheService);
+
+ final AtomicDistributedMapCacheClient<byte[]> hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
+ .asControllerService(AtomicDistributedMapCacheClient.class);
+
+ // First time value should not already be in cache so this should return true
+ final boolean newResult = hBaseCacheService.replace(new AtomicCacheEntry(key,value,null), stringSerializer, stringSerializer);
+ assertTrue(newResult);
+
+ // Second time value is already in cache so this should return false
+ final boolean existingResult = hBaseCacheService.replace(new AtomicCacheEntry(key,value,revision), stringSerializer, stringSerializer);
+ assertTrue(existingResult);
+
+ // Third time we're replacing with a new value so this should return true
+ final boolean replaceResult = hBaseCacheService.replace(new AtomicCacheEntry(key,"value2",revision), stringSerializer, stringSerializer);
+ assertTrue(replaceResult);
+ }
+
private MockHBaseClientService configureHBaseClientService(final TestRunner runner, final Table table) throws InitializationException {
final MockHBaseClientService service = new MockHBaseClientService(table, "family1", kerberosPropsWithFile);
runner.addControllerService("hbaseClient", service);
@@ -259,7 +334,7 @@ public class TestHBase_1_1_2_ClientMapCacheService {
return service;
}
- private DistributedMapCacheClient configureHBaseCacheService(final TestRunner runner, final HBaseClientService service) throws InitializationException {
+ private AtomicDistributedMapCacheClient<byte[]> configureHBaseCacheService(final TestRunner runner, final HBaseClientService service) throws InitializationException {
final HBase_1_1_2_ClientMapCacheService cacheService = new HBase_1_1_2_ClientMapCacheService();
runner.addControllerService("hbaseCache", cacheService);
runner.setProperty(cacheService, HBase_1_1_2_ClientMapCacheService.HBASE_CLIENT_SERVICE, "hbaseClient");
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java
old mode 100644
new mode 100755
index e6ad650..cd3b684
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java
@@ -30,7 +30,8 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.distributed.cache.client.Deserializer;
@@ -52,7 +53,7 @@ import static org.apache.nifi.hbase.VisibilityLabelUtils.AUTHORIZATIONS;
@CapabilityDescription("Provides the ability to use an HBase table as a cache, in place of a DistributedMapCache."
+ " Uses a HBase_2_ClientService controller to communicate with HBase.")
-public class HBase_2_ClientMapCacheService extends AbstractControllerService implements DistributedMapCacheClient {
+public class HBase_2_ClientMapCacheService extends AbstractControllerService implements AtomicDistributedMapCacheClient<byte[]> {
static final PropertyDescriptor HBASE_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("HBase Client Service")
@@ -229,6 +230,36 @@ public class HBase_2_ClientMapCacheService extends AbstractControllerService imp
protected void finalize() throws Throwable {
}
+ @Override
+ public <K, V> AtomicCacheEntry<K, V, byte[]> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+ final byte[] rowIdBytes = serialize(key, keySerializer);
+ final HBaseRowHandler handler = new HBaseRowHandler();
+
+ final List<Column> columnsList = new ArrayList<>(1);
+ columnsList.add(new Column(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes));
+
+ hBaseClientService.scan(hBaseCacheTableName, rowIdBytes, rowIdBytes, columnsList, authorizations, handler);
+
+ if (handler.numRows() > 1) {
+ throw new IOException("Found multiple rows in HBase for key");
+ } else if (handler.numRows() == 1) {
+ return new AtomicCacheEntry<>(key, deserialize(handler.getLastResultBytes(), valueDeserializer), handler.getLastResultBytes());
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public <K, V> boolean replace(AtomicCacheEntry<K, V, byte[]> entry, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
+ final byte[] rowIdBytes = serialize(entry.getKey(), keySerializer);
+ final byte[] valueBytes = serialize(entry.getValue(), valueSerializer);
+ final byte[] revision = entry.getRevision().orElse(null);
+ final PutColumn putColumn = new PutColumn(hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, valueBytes);
+
+ // If the current revision is unset then only insert the row if it doesn't already exist.
+ return hBaseClientService.checkAndPut(hBaseCacheTableName, rowIdBytes, hBaseColumnFamilyBytes, hBaseColumnQualifierBytes, revision, putColumn);
+ }
+
private class HBaseRowHandler implements ResultHandler {
private int numRows = 0;
private byte[] lastResultBytes;
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
index a30d465..19b0577 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/MockHBaseClientService.java
@@ -48,7 +48,7 @@ public class MockHBaseClientService extends HBase_2_ClientService {
private Table table;
private String family;
- private List<Result> results = new ArrayList<>();
+ private Map<String, Result> results = new HashMap<>();
private KerberosProperties kerberosProperties;
public MockHBaseClientService(final Table table, final String family, final KerberosProperties kerberosProperties) {
@@ -102,7 +102,7 @@ public class MockHBaseClientService extends HBase_2_ClientService {
final Result result = Mockito.mock(Result.class);
when(result.getRow()).thenReturn(rowArray);
when(result.rawCells()).thenReturn(cellArray);
- results.add(result);
+ results.put(rowKey, result);
}
@Override
@@ -123,12 +123,12 @@ public class MockHBaseClientService extends HBase_2_ClientService {
@Override
public boolean checkAndPut(final String tableName, final byte[] rowId, final byte[] family, final byte[] qualifier, final byte[] value, final PutColumn column) throws IOException {
- for (Result result : results) {
+ for (Result result : results.values()) {
if (Arrays.equals(result.getRow(), rowId)) {
Cell[] cellArray = result.rawCells();
for (Cell cell : cellArray) {
if (Arrays.equals(cell.getFamilyArray(), family) && Arrays.equals(cell.getQualifierArray(), qualifier)) {
- if (value == null || Arrays.equals(cell.getValueArray(), value)) {
+ if (value == null || !Arrays.equals(cell.getValueArray(), value)) {
return false;
}
}
@@ -144,21 +144,21 @@ public class MockHBaseClientService extends HBase_2_ClientService {
protected ResultScanner getResults(Table table, byte[] startRow, byte[] endRow, Collection<Column> columns, List<String> labels) throws IOException {
final ResultScanner scanner = Mockito.mock(ResultScanner.class);
- Mockito.when(scanner.iterator()).thenReturn(results.iterator());
+ Mockito.when(scanner.iterator()).thenReturn(results.values().iterator());
return scanner;
}
@Override
protected ResultScanner getResults(Table table, Collection<Column> columns, Filter filter, long minTime, List<String> labels) throws IOException {
final ResultScanner scanner = Mockito.mock(ResultScanner.class);
- Mockito.when(scanner.iterator()).thenReturn(results.iterator());
+ Mockito.when(scanner.iterator()).thenReturn(results.values().iterator());
return scanner;
}
protected ResultScanner getResults(final Table table, final String startRow, final String endRow, final String filterExpression, final Long timerangeMin, final Long timerangeMax,
final Integer limitRows, final Boolean isReversed, final Collection<Column> columns) throws IOException {
final ResultScanner scanner = Mockito.mock(ResultScanner.class);
- Mockito.when(scanner.iterator()).thenReturn(results.iterator());
+ Mockito.when(scanner.iterator()).thenReturn(results.values().iterator());
return scanner;
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java
index eb3fb45..cacefb6 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/test/java/org/apache/nifi/hbase/TestHBase_2_ClientMapCacheService.java
@@ -20,6 +20,8 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
@@ -43,6 +45,7 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -249,6 +252,78 @@ public class TestHBase_2_ClientMapCacheService {
assertEquals( result, content);
}
+ @Test
+ public void testFetch() throws InitializationException, IOException {
+ final String key = "key1";
+ final String value = "value1";
+ final byte[] revision = value.getBytes();
+
+ final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+
+ // Mock an HBase Table so we can verify the put operations later
+ final Table table = Mockito.mock(Table.class);
+ when(table.getName()).thenReturn(TableName.valueOf(tableName));
+
+ // create the controller service and link it to the test processor
+ final MockHBaseClientService service = configureHBaseClientService(runner, table);
+ runner.assertValid(service);
+
+ final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
+ .asControllerService(HBaseClientService.class);
+
+ final AtomicDistributedMapCacheClient<byte[]> cacheService = configureHBaseCacheService(runner, hBaseClientService);
+ runner.assertValid(cacheService);
+
+ final AtomicDistributedMapCacheClient<byte[]> hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
+ .asControllerService(AtomicDistributedMapCacheClient.class);
+
+ hBaseCacheService.put(key, value, stringSerializer, stringSerializer);
+
+ final AtomicCacheEntry<String, String, byte[]> atomicCacheEntry = hBaseCacheService.fetch(key, stringSerializer, stringDeserializer);
+
+ assertEquals(key, atomicCacheEntry.getKey());
+ assertEquals(value, atomicCacheEntry.getValue());
+ assertArrayEquals(revision, atomicCacheEntry.getRevision().get());
+ }
+
+ @Test
+ public void testReplace() throws InitializationException, IOException {
+ final String key = "key1";
+ final String value = "value1";
+ final byte[] revision = value.getBytes();
+
+ final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+
+ // Mock an HBase Table so we can verify the put operations later
+ final Table table = Mockito.mock(Table.class);
+ when(table.getName()).thenReturn(TableName.valueOf(tableName));
+
+ // create the controller service and link it to the test processor
+ final MockHBaseClientService service = configureHBaseClientService(runner, table);
+ runner.assertValid(service);
+
+ final HBaseClientService hBaseClientService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CLIENT_SERVICE)
+ .asControllerService(HBaseClientService.class);
+
+ final AtomicDistributedMapCacheClient<byte[]> cacheService = configureHBaseCacheService(runner, hBaseClientService);
+ runner.assertValid(cacheService);
+
+ final AtomicDistributedMapCacheClient<byte[]> hBaseCacheService = runner.getProcessContext().getProperty(TestProcessor.HBASE_CACHE_SERVICE)
+ .asControllerService(AtomicDistributedMapCacheClient.class);
+
+ // First time value should not already be in cache so this should return true
+ final boolean newResult = hBaseCacheService.replace(new AtomicCacheEntry(key,value,null), stringSerializer, stringSerializer);
+ assertTrue(newResult);
+
+ // Second time value is already in cache so this should return false
+ final boolean existingResult = hBaseCacheService.replace(new AtomicCacheEntry(key,value,revision), stringSerializer, stringSerializer);
+ assertTrue(existingResult);
+
+ // Third time we're replacing with a new value so this should return true
+ final boolean replaceResult = hBaseCacheService.replace(new AtomicCacheEntry(key,"value2",revision), stringSerializer, stringSerializer);
+ assertTrue(replaceResult);
+ }
+
private MockHBaseClientService configureHBaseClientService(final TestRunner runner, final Table table) throws InitializationException {
final MockHBaseClientService service = new MockHBaseClientService(table, "family1", kerberosPropsWithFile);
@@ -259,7 +334,7 @@ public class TestHBase_2_ClientMapCacheService {
return service;
}
- private DistributedMapCacheClient configureHBaseCacheService(final TestRunner runner, final HBaseClientService service) throws InitializationException {
+ private AtomicDistributedMapCacheClient<byte[]> configureHBaseCacheService(final TestRunner runner, final HBaseClientService service) throws InitializationException {
final HBase_2_ClientMapCacheService cacheService = new HBase_2_ClientMapCacheService();
runner.addControllerService("hbaseCache", cacheService);
runner.setProperty(cacheService, HBase_2_ClientMapCacheService.HBASE_CLIENT_SERVICE, "hbaseClient");