You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mo...@apache.org on 2017/11/21 20:47:52 UTC
nifi git commit: NIFI-4589: Allow multiple keys in
FetchDistributedMapCache, add subMap operation to API
Repository: nifi
Updated Branches:
refs/heads/master 412b3fbbe -> 16a23f5a0
NIFI-4589: Allow multiple keys in FetchDistributedMapCache, add subMap operation to API
Signed-off-by: Mike Moser <mo...@apache.org>
This closes #2260.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/16a23f5a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/16a23f5a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/16a23f5a
Branch: refs/heads/master
Commit: 16a23f5a0fe841a609abaf11f19b4d580a88c401
Parents: 412b3fb
Author: Matthew Burgess <ma...@apache.org>
Authored: Thu Nov 9 10:31:16 2017 -0500
Committer: Mike Moser <mo...@apache.org>
Committed: Tue Nov 21 20:13:35 2017 +0000
----------------------------------------------------------------------
.../standard/FetchDistributedMapCache.java | 145 ++++++++++++++-----
.../standard/TestFetchDistributedMapCache.java | 58 ++++++++
.../cache/client/DistributedMapCacheClient.java | 29 ++++
.../DistributedMapCacheClientService.java | 46 +++++-
.../distributed/cache/server/map/MapCache.java | 3 +
.../cache/server/map/MapCacheServer.java | 19 ++-
.../cache/server/map/PersistentMapCache.java | 12 ++
.../cache/server/map/SimpleMapCache.java | 27 +++-
.../cache/server/map/TestSimpleMapCache.java | 6 +
9 files changed, 305 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/16a23f5a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchDistributedMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchDistributedMapCache.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchDistributedMapCache.java
index 185ed72..e50e843 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchDistributedMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchDistributedMapCache.java
@@ -26,6 +26,9 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
@@ -40,28 +43,35 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
@EventDriven
@SupportsBatching
@Tags({"map", "cache", "fetch", "distributed"})
@InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Computes a cache key from FlowFile attributes, for each incoming FlowFile, and fetches the value from the Distributed Map Cache associated "
- + "with that key. The incoming FlowFile's content is replaced with the binary data received by the Distributed Map Cache. If there is no value stored "
- + "under that key then the flow file will be routed to 'not-found'. Note that the processor will always attempt to read the entire cached value into "
+@CapabilityDescription("Computes cache key(s) from FlowFile attributes, for each incoming FlowFile, and fetches the value(s) from the Distributed Map Cache associated "
+ + "with each key. If configured without a destination attribute, the incoming FlowFile's content is replaced with the binary data received by the Distributed Map Cache. "
+ + "If there is no value stored under that key then the flow file will be routed to 'not-found'. Note that the processor will always attempt to read the entire cached value into "
+ "memory before placing it in it's destination. This could be potentially problematic if the cached value is very large.")
@WritesAttribute(attribute = "user-defined", description = "If the 'Put Cache Value In Attribute' property is set then whatever it is set to "
- + "will become the attribute key and the value would be whatever the response was from the Distributed Map Cache.")
+ + "will become the attribute key and the value would be whatever the response was from the Distributed Map Cache. If multiple cache entry identifiers are selected, "
+ + "multiple attributes will be written, using the evaluated value of this property, appended by a period (.) and the name of the cache entry identifier. For example, if "
+ + "the Cache Entry Identifier property is set to 'id,name', and the user-defined property is named 'fetched', then two attributes will be written, "
+ + "fetched.id and fetched.name, containing their respective values.")
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
"org.apache.nifi.processors.standard.PutDistributedMapCache"})
public class FetchDistributedMapCache extends AbstractProcessor {
@@ -75,8 +85,10 @@ public class FetchDistributedMapCache extends AbstractProcessor {
public static final PropertyDescriptor PROP_CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
.name("Cache Entry Identifier")
- .description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated "
- + "against a FlowFile in order to determine the value used to identify duplicates; it is this value that is cached")
+ .description("A comma-delimited list of FlowFile attributes, or the results of Attribute Expression Language statements, which will be evaluated "
+ + "against a FlowFile in order to determine the value(s) used to identify duplicates; it is these values that are cached. NOTE: Only a single "
+ + "Cache Entry Identifier is allowed unless Put Cache Value In Attribute is specified. Multiple cache lookups are only supported when the destination "
+ + "is a set of attributes (see the documentation for 'Put Cache Value In Attribute' for more details including naming convention.")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.defaultValue("${hash.value}")
@@ -86,7 +98,8 @@ public class FetchDistributedMapCache extends AbstractProcessor {
public static final PropertyDescriptor PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("Put Cache Value In Attribute")
.description("If set, the cache value received will be put into an attribute of the FlowFile instead of a the content of the"
- + "FlowFile. The attribute key to put to is determined by evaluating value of this property.")
+ + "FlowFile. The attribute key to put to is determined by evaluating value of this property. If multiple Cache Entry Identifiers are selected, "
+ + "multiple attributes will be written, using the evaluated value of this property, appended by a period (.) and the name of the cache entry identifier.")
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.expressionLanguageSupported(true)
.build();
@@ -151,6 +164,35 @@ public class FetchDistributedMapCache extends AbstractProcessor {
}
@Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+ PropertyValue cacheEntryIdentifier = validationContext.getProperty(PROP_CACHE_ENTRY_IDENTIFIER);
+ boolean elPresent = false;
+ try {
+ elPresent = cacheEntryIdentifier.isExpressionLanguagePresent();
+ } catch (NullPointerException npe) {
+ // Unfortunate workaround to a mock framework bug (NIFI-4590)
+ }
+
+ if (elPresent) {
+ // This doesn't do a full job of validating against the requirement that Put Cache Value In Attribute must be set if multiple
+ // Cache Entry Identifiers are supplied (if Expression Language is used). The user could conceivably have a comma-separated list of EL statements,
+ // or a single EL statement with commas inside it but that evaluates to a single item.
+ results.add(new ValidationResult.Builder().valid(true).explanation("Contains Expression Language").build());
+ } else {
+ if (!validationContext.getProperty(FetchDistributedMapCache.PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).isSet()) {
+ String identifierString = cacheEntryIdentifier.getValue();
+ if (identifierString.contains(",")) {
+ results.add(new ValidationResult.Builder().valid(false)
+ .explanation("Multiple Cache Entry Identifiers specified without Put Cache Value In Attribute set").build());
+ }
+ }
+ }
+ return results;
+ }
+
+ @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
@@ -159,51 +201,80 @@ public class FetchDistributedMapCache extends AbstractProcessor {
final ComponentLog logger = getLogger();
final String cacheKey = context.getProperty(PROP_CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
+ // This block retains the previous behavior when only one Cache Entry Identifier was allowed, so as not to change the expected error message
if (StringUtils.isBlank(cacheKey)) {
logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[]{flowFile});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
+ List<String> cacheKeys = Arrays.stream(cacheKey.split(",")).filter(path -> !StringUtils.isEmpty(path)).map(String::trim).collect(Collectors.toList());
+ for (int i = 0; i < cacheKeys.size(); i++) {
+ if (StringUtils.isBlank(cacheKeys.get(i))) {
+ // Log first missing identifier, route to failure, and return
+ logger.error("FlowFile {} has no attribute for Cache Entry Identifier in position {}", new Object[]{flowFile, i});
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+ }
+
final DistributedMapCacheClient cache = context.getProperty(PROP_DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
try {
- final byte[] cacheValue = cache.get(cacheKey, keySerializer, valueDeserializer);
-
- if(cacheValue==null){
- session.transfer(flowFile, REL_NOT_FOUND);
- logger.info("Could not find an entry in cache for {}; routing to not-found", new Object[]{flowFile});
-
+ final Map<String, byte[]> cacheValues;
+ final boolean singleKey = cacheKeys.size() == 1;
+ if (singleKey) {
+ cacheValues = new HashMap<>(1);
+ cacheValues.put(cacheKeys.get(0), cache.get(cacheKey, keySerializer, valueDeserializer));
} else {
- boolean putInAttribute = context.getProperty(PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).isSet();
- if(putInAttribute){
- String attributeName = context.getProperty(PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
- String attributeValue = new String(cacheValue,context.getProperty(PROP_CHARACTER_SET).getValue());
-
- int maxLength = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
- if(maxLength < attributeValue.length()){
- attributeValue = attributeValue.substring(0,maxLength);
- }
-
- flowFile = session.putAttribute(flowFile, attributeName, attributeValue);
+ cacheValues = cache.subMap(new HashSet<>(cacheKeys), keySerializer, valueDeserializer);
+ }
+ boolean notFound = false;
+ for(Map.Entry<String,byte[]> cacheValueEntry : cacheValues.entrySet()) {
+ final byte[] cacheValue = cacheValueEntry.getValue();
+ if (cacheValue == null) {
+ logger.info("Could not find an entry in cache for {}; routing to not-found", new Object[]{flowFile});
+ notFound = true;
+ break;
} else {
- flowFile = session.write(flowFile, new OutputStreamCallback() {
- @Override
- public void process(OutputStream out) throws IOException {
- out.write(cacheValue);
+ boolean putInAttribute = context.getProperty(PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).isSet();
+ if (putInAttribute) {
+ String attributeName = context.getProperty(PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
+ if (!singleKey) {
+ // Append key to attribute name if multiple keys
+ attributeName += "." + cacheValueEntry.getKey();
}
- });
- }
+ String attributeValue = new String(cacheValue, context.getProperty(PROP_CHARACTER_SET).getValue());
- session.transfer(flowFile, REL_SUCCESS);
- if(putInAttribute){
- logger.info("Found a cache key of {} and added an attribute to {} with it's value.", new Object[]{cacheKey, flowFile});
- }else {
- logger.info("Found a cache key of {} and replaced the contents of {} with it's value.", new Object[]{cacheKey, flowFile});
+ int maxLength = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
+ if (maxLength < attributeValue.length()) {
+ attributeValue = attributeValue.substring(0, maxLength);
+ }
+
+ flowFile = session.putAttribute(flowFile, attributeName, attributeValue);
+
+ } else if (cacheKeys.size() > 1) {
+ throw new IOException("Multiple Cache Value Identifiers specified without Put Cache Value In Attribute set");
+ } else {
+ // Write single value to content
+ flowFile = session.write(flowFile, out -> out.write(cacheValue));
+ }
+
+ if (putInAttribute) {
+ logger.info("Found a cache key of {} and added an attribute to {} with it's value.", new Object[]{cacheKey, flowFile});
+ } else {
+ logger.info("Found a cache key of {} and replaced the contents of {} with it's value.", new Object[]{cacheKey, flowFile});
+ }
}
}
-
+ // If the loop was exited because a cache entry was not found, route to REL_NOT_FOUND; otherwise route to REL_SUCCESS
+ if (notFound) {
+ session.transfer(flowFile, REL_NOT_FOUND);
+ } else {
+ session.transfer(flowFile, REL_SUCCESS);
+ }
} catch (final IOException e) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
http://git-wip-us.apache.org/repos/asf/nifi/blob/16a23f5a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java
index 549ad13..4c2d991 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java
@@ -70,6 +70,19 @@ public class TestFetchDistributedMapCache {
}
@Test
+ public void testNoCacheKeyValue() throws InitializationException {
+
+ runner.setProperty(FetchDistributedMapCache.PROP_CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}");
+ runner.enqueue(new byte[] {});
+ runner.run();
+
+ // Cache key attribute evaluated to empty
+ runner.assertAllFlowFilesTransferred(FetchDistributedMapCache.REL_FAILURE, 1);
+ runner.assertTransferCount(FetchDistributedMapCache.REL_FAILURE, 1);
+ runner.clearTransferState();
+ }
+
+ @Test
public void testFailingCacheService() throws InitializationException, IOException {
service.setFailOnCalls(true);
runner.setProperty(FetchDistributedMapCache.PROP_CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}");
@@ -151,6 +164,51 @@ public class TestFetchDistributedMapCache {
runner.clearTransferState();
}
+ @Test
+ public void testMultipleKeysToAttributes() throws InitializationException, IOException {
+ service.put("key1","value1", new FetchDistributedMapCache.StringSerializer(), new FetchDistributedMapCache.StringSerializer());
+ service.put("key2","value2", new FetchDistributedMapCache.StringSerializer(), new FetchDistributedMapCache.StringSerializer());
+ runner.setProperty(FetchDistributedMapCache.PROP_CACHE_ENTRY_IDENTIFIER, "key1, key2");
+ // Not valid to set multiple keys without Put Cache Value In Attribute set
+ runner.assertNotValid();
+ runner.setProperty(FetchDistributedMapCache.PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE, "test");
+ runner.assertValid();
+
+ final Map<String, String> props = new HashMap<>();
+ runner.enqueue(new byte[]{}, props);
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(FetchDistributedMapCache.REL_SUCCESS, 1);
+ runner.assertTransferCount(FetchDistributedMapCache.REL_SUCCESS, 1);
+
+ final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(FetchDistributedMapCache.REL_SUCCESS).get(0);
+ outputFlowFile.assertAttributeEquals("test.key1","value1");
+ outputFlowFile.assertAttributeEquals("test.key2","value2");
+ }
+
+ @Test
+ public void testMultipleKeysOneNotFound() throws InitializationException, IOException {
+ service.put("key1","value1", new FetchDistributedMapCache.StringSerializer(), new FetchDistributedMapCache.StringSerializer());
+ runner.setProperty(FetchDistributedMapCache.PROP_CACHE_ENTRY_IDENTIFIER, "key1, key2");
+ // Not valid to set multiple keys without Put Cache Value In Attribute set
+ runner.assertNotValid();
+ runner.setProperty(FetchDistributedMapCache.PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE, "test");
+ runner.assertValid();
+
+ final Map<String, String> props = new HashMap<>();
+ runner.enqueue(new byte[]{}, props);
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(FetchDistributedMapCache.REL_NOT_FOUND, 1);
+ runner.assertTransferCount(FetchDistributedMapCache.REL_NOT_FOUND, 1);
+
+ final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(FetchDistributedMapCache.REL_NOT_FOUND).get(0);
+ outputFlowFile.assertAttributeEquals("test.key1","value1");
+ }
+
+
private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
private boolean failOnCalls = false;
http://git-wip-us.apache.org/repos/asf/nifi/blob/16a23f5a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
index e593f9d..d2e0085 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
@@ -17,6 +17,9 @@
package org.apache.nifi.distributed.cache.client;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@@ -118,6 +121,32 @@ public interface DistributedMapCacheClient extends ControllerService {
<K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException;
/**
+ * Returns the values in the cache for the given keys, if they exist;
+ * otherwise returns <code>null</code>
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ * @param keys a set of keys whose values to lookup in the map
+ * @param keySerializer key serializer
+ * @param valueDeserializer value serializer
+ *
+ * @return the value in the cache for the given key, if one exists;
+ * otherwise returns <code>null</code>
+ * @throws IOException ex
+ */
+ default <K, V> Map<K, V> subMap(Set<K> keys, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+ // Default behavior is to iterate over the keys, calling get(key) and putting it into the results map
+ if (keys == null) {
+ return null;
+ }
+ Map<K, V> results = new HashMap<>(keys.size());
+ for (K key : keys) {
+ results.put(key, get(key, keySerializer, valueDeserializer));
+ }
+ return results;
+ }
+
+ /**
* Attempts to notify the server that we are finished communicating with it
* and cleans up resources
*
http://git-wip-us.apache.org/repos/asf/nifi/blob/16a23f5a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index 9651c26..c454063 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -21,7 +21,10 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -205,6 +208,36 @@ public class DistributedMapCacheClientService extends AbstractControllerService
}
@Override
+ public <K, V> Map<K, V> subMap(Set<K> keys, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+ return withCommsSession(session -> {
+ Map<K, V> response = new HashMap<>(keys.size());
+ try {
+ validateProtocolVersion(session, 3);
+
+ final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+ dos.writeUTF("subMap");
+ serialize(keys, keySerializer, dos);
+ dos.flush();
+
+ // read response
+ final DataInputStream dis = new DataInputStream(session.getInputStream());
+
+ for (K key : keys) {
+ final byte[] responseBuffer = readLengthDelimitedResponse(dis);
+ response.put(key, valueDeserializer.deserialize(responseBuffer));
+ }
+ } catch (UnsupportedOperationException uoe) {
+ // If the server doesn't support subMap, just emulate it with multiple calls to get()
+ for (K key : keys) {
+ response.put(key, get(key, keySerializer, valueDeserializer));
+ }
+ }
+
+ return response;
+ });
+ }
+
+ @Override
public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
return withCommsSession(new CommsAction<Boolean>() {
@Override
@@ -319,7 +352,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService
}
session = createCommsSession(configContext);
- final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(2, 1);
+ final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(3, 2, 1);
try {
ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
session.setProtocolVersion(versionNegotiator.getVersion());
@@ -368,6 +401,17 @@ public class DistributedMapCacheClientService extends AbstractControllerService
baos.writeTo(dos);
}
+ private <T> void serialize(final Set<T> values, final Serializer<T> serializer, final DataOutputStream dos) throws IOException {
+ // Write the number of elements to follow, then each element and its size
+ dos.writeInt(values.size());
+ for(T value : values) {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ serializer.serialize(value, baos);
+ dos.writeInt(baos.size());
+ baos.writeTo(dos);
+ }
+ }
+
private <T> T withCommsSession(final CommsAction<T> action) throws IOException {
if (closed) {
throw new IllegalStateException("Client is closed");
http://git-wip-us.apache.org/repos/asf/nifi/blob/16a23f5a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
index 8bd9bdc..bbffbf9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
@@ -18,6 +18,7 @@ package org.apache.nifi.distributed.cache.server.map;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.Map;
public interface MapCache {
@@ -30,6 +31,8 @@ public interface MapCache {
ByteBuffer get(ByteBuffer key) throws IOException;
+ Map<ByteBuffer, ByteBuffer> subMap(List<ByteBuffer> keys) throws IOException;
+
ByteBuffer remove(ByteBuffer key) throws IOException;
Map<ByteBuffer, ByteBuffer> removeByPattern(String regex) throws IOException;
http://git-wip-us.apache.org/repos/asf/nifi/blob/16a23f5a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
index 21090bc..a0a01c1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
@@ -56,7 +56,7 @@ public class MapCacheServer extends AbstractCacheServer {
* for details of each version enhancements.
*/
protected StandardVersionNegotiator getVersionNegotiator() {
- return new StandardVersionNegotiator(2, 1);
+ return new StandardVersionNegotiator(3, 2, 1);
}
@Override
@@ -121,6 +121,23 @@ public class MapCacheServer extends AbstractCacheServer {
break;
}
+ case "subMap": {
+ final int numKeys = dis.readInt();
+ for(int i=0;i<numKeys;i++) {
+ final byte[] key = readValue(dis);
+ final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
+ if (existingValue == null) {
+ // there was no existing value.
+ dos.writeInt(0);
+ } else {
+ // a value already existed.
+ final byte[] byteArray = existingValue.array();
+ dos.writeInt(byteArray.length);
+ dos.write(byteArray);
+ }
+ }
+ break;
+ }
case "remove": {
final byte[] key = readValue(dis);
final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null;
http://git-wip-us.apache.org/repos/asf/nifi/blob/16a23f5a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
index 28861ea..6bf6e5a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
@@ -111,6 +111,18 @@ public class PersistentMapCache implements MapCache {
}
@Override
+ public Map<ByteBuffer, ByteBuffer> subMap(List<ByteBuffer> keys) throws IOException {
+ if (keys == null) {
+ return null;
+ }
+ Map<ByteBuffer, ByteBuffer> results = new HashMap<>(keys.size());
+ for (ByteBuffer key : keys) {
+ results.put(key, wrapped.get(key));
+ }
+ return results;
+ }
+
+ @Override
public MapCacheRecord fetch(ByteBuffer key) throws IOException {
return wrapped.fetch(key);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/16a23f5a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
index baa2d0f..df78332 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
@@ -60,7 +60,7 @@ public class SimpleMapCache implements MapCache {
@Override
public String toString() {
- return "SimpleSetCache[service id=" + serviceIdentifier + "]";
+ return "SimpleMapCache[service id=" + serviceIdentifier + "]";
}
// don't need synchronized because this method is only called when the writeLock is held, and all
@@ -171,6 +171,31 @@ public class SimpleMapCache implements MapCache {
}
@Override
+ public Map<ByteBuffer, ByteBuffer> subMap(List<ByteBuffer> keys) throws IOException {
+ if (keys == null) {
+ return null;
+ }
+ Map<ByteBuffer, ByteBuffer> results = new HashMap<>(keys.size());
+ readLock.lock();
+ try {
+ keys.forEach((key) -> {
+ final MapCacheRecord record = cache.get(key);
+ if (record == null) {
+ results.put(key, null);
+ } else {
+ inverseCacheMap.remove(record);
+ record.hit();
+ inverseCacheMap.put(record, key);
+ results.put(key, record.getValue());
+ }
+ });
+ } finally {
+ readLock.unlock();
+ }
+ return results;
+ }
+
+ @Override
public ByteBuffer remove(ByteBuffer key) throws IOException {
writeLock.lock();
try {
http://git-wip-us.apache.org/repos/asf/nifi/blob/16a23f5a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java
index 2e19714..0cd2813 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java
@@ -20,6 +20,8 @@ import org.apache.nifi.distributed.cache.server.EvictionPolicy;
import org.junit.Test;
import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -89,6 +91,10 @@ public class TestSimpleMapCache {
assertNull(putResult.getEvicted());
assertEquals("Revision should start from 0", 0, putResult.getRecord().getRevision());
+ // Get multiple keys
+ Map<ByteBuffer, ByteBuffer> results = cache.subMap(Arrays.asList(key1, key2, key3));
+ assertEquals(3, results.size());
+
}
@Test