You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mo...@apache.org on 2017/11/21 20:47:52 UTC

nifi git commit: NIFI-4589: Allow multiple keys in FetchDistributedMapCache, add subMap operation to API

Repository: nifi
Updated Branches:
  refs/heads/master 412b3fbbe -> 16a23f5a0


NIFI-4589: Allow multiple keys in FetchDistributedMapCache, add subMap operation to API

Signed-off-by: Mike Moser <mo...@apache.org>

This closes #2260.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/16a23f5a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/16a23f5a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/16a23f5a

Branch: refs/heads/master
Commit: 16a23f5a0fe841a609abaf11f19b4d580a88c401
Parents: 412b3fb
Author: Matthew Burgess <ma...@apache.org>
Authored: Thu Nov 9 10:31:16 2017 -0500
Committer: Mike Moser <mo...@apache.org>
Committed: Tue Nov 21 20:13:35 2017 +0000

----------------------------------------------------------------------
 .../standard/FetchDistributedMapCache.java      | 145 ++++++++++++++-----
 .../standard/TestFetchDistributedMapCache.java  |  58 ++++++++
 .../cache/client/DistributedMapCacheClient.java |  29 ++++
 .../DistributedMapCacheClientService.java       |  46 +++++-
 .../distributed/cache/server/map/MapCache.java  |   3 +
 .../cache/server/map/MapCacheServer.java        |  19 ++-
 .../cache/server/map/PersistentMapCache.java    |  12 ++
 .../cache/server/map/SimpleMapCache.java        |  27 +++-
 .../cache/server/map/TestSimpleMapCache.java    |   6 +
 9 files changed, 305 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/16a23f5a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchDistributedMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchDistributedMapCache.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchDistributedMapCache.java
index 185ed72..e50e843 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchDistributedMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchDistributedMapCache.java
@@ -26,6 +26,9 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Serializer;
@@ -40,28 +43,35 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 @EventDriven
 @SupportsBatching
 @Tags({"map", "cache", "fetch", "distributed"})
 @InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Computes a cache key from FlowFile attributes, for each incoming FlowFile, and fetches the value from the Distributed Map Cache associated "
-        + "with that key. The incoming FlowFile's content is replaced with the binary data received by the Distributed Map Cache. If there is no value stored "
-        + "under that key then the flow file will be routed to 'not-found'. Note that the processor will always attempt to read the entire cached value into "
+@CapabilityDescription("Computes cache key(s) from FlowFile attributes, for each incoming FlowFile, and fetches the value(s) from the Distributed Map Cache associated "
+        + "with each key. If configured without a destination attribute, the incoming FlowFile's content is replaced with the binary data received by the Distributed Map Cache. "
+        + "If there is no value stored under that key then the flow file will be routed to 'not-found'. Note that the processor will always attempt to read the entire cached value into "
         + "memory before placing it in it's destination. This could be potentially problematic if the cached value is very large.")
 @WritesAttribute(attribute = "user-defined", description = "If the 'Put Cache Value In Attribute' property is set then whatever it is set to "
-        + "will become the attribute key and the value would be whatever the response was from the Distributed Map Cache.")
+        + "will become the attribute key and the value would be whatever the response was from the Distributed Map Cache. If multiple cache entry identifiers are selected, "
+        + "multiple attributes will be written, using the evaluated value of this property, appended by a period (.) and the name of the cache entry identifier. For example, if "
+        + "the Cache Entry Identifier property is set to 'id,name', and the user-defined property is named 'fetched', then two attributes will be written, "
+        + "fetched.id and fetched.name, containing their respective values.")
 @SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
         "org.apache.nifi.processors.standard.PutDistributedMapCache"})
 public class FetchDistributedMapCache extends AbstractProcessor {
@@ -75,8 +85,10 @@ public class FetchDistributedMapCache extends AbstractProcessor {
 
     public static final PropertyDescriptor PROP_CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
             .name("Cache Entry Identifier")
-            .description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated "
-                    + "against a FlowFile in order to determine the value used to identify duplicates; it is this value that is cached")
+            .description("A comma-delimited list of FlowFile attributes, or the results of Attribute Expression Language statements, which will be evaluated "
+                    + "against a FlowFile in order to determine the value(s) used to identify duplicates; it is these values that are cached. NOTE: Only a single "
+                    + "Cache Entry Identifier is allowed unless Put Cache Value In Attribute is specified. Multiple cache lookups are only supported when the destination "
+                    + "is a set of attributes (see the documentation for 'Put Cache Value In Attribute' for more details including naming convention.")
             .required(true)
             .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
             .defaultValue("${hash.value}")
@@ -86,7 +98,8 @@ public class FetchDistributedMapCache extends AbstractProcessor {
     public static final PropertyDescriptor PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE = new PropertyDescriptor.Builder()
             .name("Put Cache Value In Attribute")
             .description("If set, the cache value received will be put into an attribute of the FlowFile instead of a the content of the"
-                    + "FlowFile. The attribute key to put to is determined by evaluating value of this property.")
+                    + "FlowFile. The attribute key to put to is determined by evaluating value of this property. If multiple Cache Entry Identifiers are selected, "
+                    + "multiple attributes will be written, using the evaluated value of this property, appended by a period (.) and the name of the cache entry identifier.")
             .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
             .expressionLanguageSupported(true)
             .build();
@@ -151,6 +164,35 @@ public class FetchDistributedMapCache extends AbstractProcessor {
     }
 
     @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
+
+        PropertyValue cacheEntryIdentifier = validationContext.getProperty(PROP_CACHE_ENTRY_IDENTIFIER);
+        boolean elPresent = false;
+        try {
+            elPresent = cacheEntryIdentifier.isExpressionLanguagePresent();
+        } catch (NullPointerException npe) {
+            // Unfortunate workaround to a mock framework bug (NIFI-4590)
+        }
+
+        if (elPresent) {
+            // This doesn't do a full job of validating against the requirement that Put Cache Value In Attribute must be set if multiple
+            // Cache Entry Identifiers are supplied (if Expression Language is used). The user could conceivably have a comma-separated list of EL statements,
+            // or a single EL statement with commas inside it but that evaluates to a single item.
+            results.add(new ValidationResult.Builder().valid(true).explanation("Contains Expression Language").build());
+        } else {
+            if (!validationContext.getProperty(FetchDistributedMapCache.PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).isSet()) {
+                String identifierString = cacheEntryIdentifier.getValue();
+                if (identifierString.contains(",")) {
+                    results.add(new ValidationResult.Builder().valid(false)
+                            .explanation("Multiple Cache Entry Identifiers specified without Put Cache Value In Attribute set").build());
+                }
+            }
+        }
+        return results;
+    }
+
+    @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         FlowFile flowFile = session.get();
         if (flowFile == null) {
@@ -159,51 +201,80 @@ public class FetchDistributedMapCache extends AbstractProcessor {
 
         final ComponentLog logger = getLogger();
         final String cacheKey = context.getProperty(PROP_CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
+        // This block retains the previous behavior when only one Cache Entry Identifier was allowed, so as not to change the expected error message
         if (StringUtils.isBlank(cacheKey)) {
             logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[]{flowFile});
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
             return;
         }
+        List<String> cacheKeys = Arrays.stream(cacheKey.split(",")).filter(path -> !StringUtils.isEmpty(path)).map(String::trim).collect(Collectors.toList());
+        for (int i = 0; i < cacheKeys.size(); i++) {
+            if (StringUtils.isBlank(cacheKeys.get(i))) {
+                // Log first missing identifier, route to failure, and return
+                logger.error("FlowFile {} has no attribute for Cache Entry Identifier in position {}", new Object[]{flowFile, i});
+                flowFile = session.penalize(flowFile);
+                session.transfer(flowFile, REL_FAILURE);
+                return;
+            }
+        }
+
         final DistributedMapCacheClient cache = context.getProperty(PROP_DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
 
         try {
-            final byte[] cacheValue = cache.get(cacheKey, keySerializer, valueDeserializer);
-
-            if(cacheValue==null){
-                session.transfer(flowFile, REL_NOT_FOUND);
-                logger.info("Could not find an entry in cache for {}; routing to not-found", new Object[]{flowFile});
-
+            final Map<String, byte[]> cacheValues;
+            final boolean singleKey = cacheKeys.size() == 1;
+            if (singleKey) {
+                cacheValues = new HashMap<>(1);
+                cacheValues.put(cacheKeys.get(0), cache.get(cacheKey, keySerializer, valueDeserializer));
             } else {
-                boolean putInAttribute = context.getProperty(PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).isSet();
-                if(putInAttribute){
-                    String attributeName = context.getProperty(PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
-                    String attributeValue = new String(cacheValue,context.getProperty(PROP_CHARACTER_SET).getValue());
-
-                    int maxLength = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
-                    if(maxLength < attributeValue.length()){
-                        attributeValue = attributeValue.substring(0,maxLength);
-                    }
-
-                    flowFile = session.putAttribute(flowFile, attributeName, attributeValue);
+                cacheValues = cache.subMap(new HashSet<>(cacheKeys), keySerializer, valueDeserializer);
+            }
+            boolean notFound = false;
+            for(Map.Entry<String,byte[]> cacheValueEntry : cacheValues.entrySet()) {
+                final byte[] cacheValue = cacheValueEntry.getValue();
 
+                if (cacheValue == null) {
+                    logger.info("Could not find an entry in cache for {}; routing to not-found", new Object[]{flowFile});
+                    notFound = true;
+                    break;
                 } else {
-                    flowFile = session.write(flowFile, new OutputStreamCallback() {
-                        @Override
-                        public void process(OutputStream out) throws IOException {
-                            out.write(cacheValue);
+                    boolean putInAttribute = context.getProperty(PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).isSet();
+                    if (putInAttribute) {
+                        String attributeName = context.getProperty(PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
+                        if (!singleKey) {
+                            // Append key to attribute name if multiple keys
+                            attributeName += "." + cacheValueEntry.getKey();
                         }
-                    });
-                }
+                        String attributeValue = new String(cacheValue, context.getProperty(PROP_CHARACTER_SET).getValue());
 
-                session.transfer(flowFile, REL_SUCCESS);
-                if(putInAttribute){
-                    logger.info("Found a cache key of {} and added an attribute to {} with it's value.", new Object[]{cacheKey, flowFile});
-                }else {
-                    logger.info("Found a cache key of {} and replaced the contents of {} with it's value.", new Object[]{cacheKey, flowFile});
+                        int maxLength = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
+                        if (maxLength < attributeValue.length()) {
+                            attributeValue = attributeValue.substring(0, maxLength);
+                        }
+
+                        flowFile = session.putAttribute(flowFile, attributeName, attributeValue);
+
+                    } else if (cacheKeys.size() > 1) {
+                        throw new IOException("Multiple Cache Value Identifiers specified without Put Cache Value In Attribute set");
+                    } else {
+                        // Write single value to content
+                        flowFile = session.write(flowFile, out -> out.write(cacheValue));
+                    }
+
+                    if (putInAttribute) {
+                        logger.info("Found a cache key of {} and added an attribute to {} with it's value.", new Object[]{cacheKey, flowFile});
+                    } else {
+                        logger.info("Found a cache key of {} and replaced the contents of {} with it's value.", new Object[]{cacheKey, flowFile});
+                    }
                 }
             }
-
+            // If the loop was exited because a cache entry was not found, route to REL_NOT_FOUND; otherwise route to REL_SUCCESS
+            if (notFound) {
+                session.transfer(flowFile, REL_NOT_FOUND);
+            } else {
+                session.transfer(flowFile, REL_SUCCESS);
+            }
         } catch (final IOException e) {
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);

http://git-wip-us.apache.org/repos/asf/nifi/blob/16a23f5a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java
index 549ad13..4c2d991 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java
@@ -70,6 +70,19 @@ public class TestFetchDistributedMapCache {
     }
 
     @Test
+    public void testNoCacheKeyValue() throws InitializationException {
+
+        runner.setProperty(FetchDistributedMapCache.PROP_CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}");
+        runner.enqueue(new byte[] {});
+        runner.run();
+
+        // Cache key attribute evaluated to empty
+        runner.assertAllFlowFilesTransferred(FetchDistributedMapCache.REL_FAILURE, 1);
+        runner.assertTransferCount(FetchDistributedMapCache.REL_FAILURE, 1);
+        runner.clearTransferState();
+    }
+
+    @Test
     public void testFailingCacheService() throws InitializationException, IOException {
         service.setFailOnCalls(true);
         runner.setProperty(FetchDistributedMapCache.PROP_CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}");
@@ -151,6 +164,51 @@ public class TestFetchDistributedMapCache {
         runner.clearTransferState();
     }
 
+    @Test
+    public void testMultipleKeysToAttributes() throws InitializationException, IOException {
+        service.put("key1","value1", new FetchDistributedMapCache.StringSerializer(), new FetchDistributedMapCache.StringSerializer());
+        service.put("key2","value2", new FetchDistributedMapCache.StringSerializer(), new FetchDistributedMapCache.StringSerializer());
+        runner.setProperty(FetchDistributedMapCache.PROP_CACHE_ENTRY_IDENTIFIER, "key1, key2");
+        // Not valid to set multiple keys without Put Cache Value In Attribute set
+        runner.assertNotValid();
+        runner.setProperty(FetchDistributedMapCache.PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE, "test");
+        runner.assertValid();
+
+        final Map<String, String> props = new HashMap<>();
+        runner.enqueue(new byte[]{}, props);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(FetchDistributedMapCache.REL_SUCCESS, 1);
+        runner.assertTransferCount(FetchDistributedMapCache.REL_SUCCESS, 1);
+
+        final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(FetchDistributedMapCache.REL_SUCCESS).get(0);
+        outputFlowFile.assertAttributeEquals("test.key1","value1");
+        outputFlowFile.assertAttributeEquals("test.key2","value2");
+    }
+
+    @Test
+    public void testMultipleKeysOneNotFound() throws InitializationException, IOException {
+        service.put("key1","value1", new FetchDistributedMapCache.StringSerializer(), new FetchDistributedMapCache.StringSerializer());
+        runner.setProperty(FetchDistributedMapCache.PROP_CACHE_ENTRY_IDENTIFIER, "key1, key2");
+        // Not valid to set multiple keys without Put Cache Value In Attribute set
+        runner.assertNotValid();
+        runner.setProperty(FetchDistributedMapCache.PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE, "test");
+        runner.assertValid();
+
+        final Map<String, String> props = new HashMap<>();
+        runner.enqueue(new byte[]{}, props);
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(FetchDistributedMapCache.REL_NOT_FOUND, 1);
+        runner.assertTransferCount(FetchDistributedMapCache.REL_NOT_FOUND, 1);
+
+        final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(FetchDistributedMapCache.REL_NOT_FOUND).get(0);
+        outputFlowFile.assertAttributeEquals("test.key1","value1");
+    }
+
+
     private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
         private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
         private boolean failOnCalls = false;

http://git-wip-us.apache.org/repos/asf/nifi/blob/16a23f5a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
index e593f9d..d2e0085 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java
@@ -17,6 +17,9 @@
 package org.apache.nifi.distributed.cache.client;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -118,6 +121,32 @@ public interface DistributedMapCacheClient extends ControllerService {
     <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException;
 
     /**
+     * Returns the values in the cache for the given keys, if they exist;
+     * otherwise returns <code>null</code>
+     *
+     * @param <K> the key type
+     * @param <V> the value type
+     * @param keys a set of keys whose values to lookup in the map
+     * @param keySerializer key serializer
+     * @param valueDeserializer value serializer
+     *
+     * @return the value in the cache for the given key, if one exists;
+     * otherwise returns <code>null</code>
+     * @throws IOException ex
+     */
+    default <K, V> Map<K, V> subMap(Set<K> keys, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+        // Default behavior is to iterate over the keys, calling get(key) and putting it into the results map
+        if (keys == null) {
+            return null;
+        }
+        Map<K, V> results = new HashMap<>(keys.size());
+        for (K key : keys) {
+            results.put(key, get(key, keySerializer, valueDeserializer));
+        }
+        return results;
+    }
+
+    /**
      * Attempts to notify the server that we are finished communicating with it
      * and cleans up resources
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/16a23f5a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index 9651c26..c454063 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -21,7 +21,10 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -205,6 +208,36 @@ public class DistributedMapCacheClientService extends AbstractControllerService
     }
 
     @Override
+    public <K, V> Map<K, V> subMap(Set<K> keys, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
+        return withCommsSession(session -> {
+            Map<K, V> response = new HashMap<>(keys.size());
+            try {
+                validateProtocolVersion(session, 3);
+
+                final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+                dos.writeUTF("subMap");
+                serialize(keys, keySerializer, dos);
+                dos.flush();
+
+                // read response
+                final DataInputStream dis = new DataInputStream(session.getInputStream());
+
+                for (K key : keys) {
+                    final byte[] responseBuffer = readLengthDelimitedResponse(dis);
+                    response.put(key, valueDeserializer.deserialize(responseBuffer));
+                }
+            } catch (UnsupportedOperationException uoe) {
+                // If the server doesn't support subMap, just emulate it with multiple calls to get()
+                for (K key : keys) {
+                    response.put(key, get(key, keySerializer, valueDeserializer));
+                }
+            }
+
+            return response;
+        });
+    }
+
+    @Override
     public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
         return withCommsSession(new CommsAction<Boolean>() {
             @Override
@@ -319,7 +352,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService
         }
 
         session = createCommsSession(configContext);
-        final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(2, 1);
+        final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(3, 2, 1);
         try {
             ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
             session.setProtocolVersion(versionNegotiator.getVersion());
@@ -368,6 +401,17 @@ public class DistributedMapCacheClientService extends AbstractControllerService
         baos.writeTo(dos);
     }
 
+    private <T> void serialize(final Set<T> values, final Serializer<T> serializer, final DataOutputStream dos) throws IOException {
+        // Write the number of elements to follow, then each element and its size
+        dos.writeInt(values.size());
+        for(T value : values) {
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            serializer.serialize(value, baos);
+            dos.writeInt(baos.size());
+            baos.writeTo(dos);
+        }
+    }
+
     private <T> T withCommsSession(final CommsAction<T> action) throws IOException {
         if (closed) {
             throw new IllegalStateException("Client is closed");

http://git-wip-us.apache.org/repos/asf/nifi/blob/16a23f5a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
index 8bd9bdc..bbffbf9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
@@ -18,6 +18,7 @@ package org.apache.nifi.distributed.cache.server.map;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.Map;
 
 public interface MapCache {
@@ -30,6 +31,8 @@ public interface MapCache {
 
     ByteBuffer get(ByteBuffer key) throws IOException;
 
+    Map<ByteBuffer, ByteBuffer> subMap(List<ByteBuffer> keys) throws IOException;
+
     ByteBuffer remove(ByteBuffer key) throws IOException;
 
     Map<ByteBuffer, ByteBuffer> removeByPattern(String regex) throws IOException;

http://git-wip-us.apache.org/repos/asf/nifi/blob/16a23f5a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
index 21090bc..a0a01c1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
@@ -56,7 +56,7 @@ public class MapCacheServer extends AbstractCacheServer {
      * for details of each version enhancements.
      */
     protected StandardVersionNegotiator getVersionNegotiator() {
-        return new StandardVersionNegotiator(2, 1);
+        return new StandardVersionNegotiator(3, 2, 1);
     }
 
     @Override
@@ -121,6 +121,23 @@ public class MapCacheServer extends AbstractCacheServer {
 
                 break;
             }
+            case "subMap": {
+                final int numKeys = dis.readInt();
+                for(int i=0;i<numKeys;i++) {
+                    final byte[] key = readValue(dis);
+                    final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
+                    if (existingValue == null) {
+                        // there was no existing value.
+                        dos.writeInt(0);
+                    } else {
+                        // a value already existed.
+                        final byte[] byteArray = existingValue.array();
+                        dos.writeInt(byteArray.length);
+                        dos.write(byteArray);
+                    }
+                }
+                break;
+            }
             case "remove": {
                 final byte[] key = readValue(dis);
                 final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null;

http://git-wip-us.apache.org/repos/asf/nifi/blob/16a23f5a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
index 28861ea..6bf6e5a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
@@ -111,6 +111,18 @@ public class PersistentMapCache implements MapCache {
     }
 
     @Override
+    public Map<ByteBuffer, ByteBuffer> subMap(List<ByteBuffer> keys) throws IOException {
+        if (keys == null) {
+            return null;
+        }
+        Map<ByteBuffer, ByteBuffer> results = new HashMap<>(keys.size());
+        for (ByteBuffer key : keys) {
+            results.put(key, wrapped.get(key));
+        }
+        return results;
+    }
+
+    @Override
     public MapCacheRecord fetch(ByteBuffer key) throws IOException {
         return wrapped.fetch(key);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/16a23f5a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
index baa2d0f..df78332 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
@@ -60,7 +60,7 @@ public class SimpleMapCache implements MapCache {
 
     @Override
     public String toString() {
-        return "SimpleSetCache[service id=" + serviceIdentifier + "]";
+        return "SimpleMapCache[service id=" + serviceIdentifier + "]";
     }
 
     // don't need synchronized because this method is only called when the writeLock is held, and all
@@ -171,6 +171,31 @@ public class SimpleMapCache implements MapCache {
     }
 
     @Override
+    public Map<ByteBuffer, ByteBuffer> subMap(List<ByteBuffer> keys) throws IOException {
+        if (keys == null) {
+            return null;
+        }
+        Map<ByteBuffer, ByteBuffer> results = new HashMap<>(keys.size());
+        readLock.lock();
+        try {
+            keys.forEach((key) -> {
+                final MapCacheRecord record = cache.get(key);
+                if (record == null) {
+                    results.put(key, null);
+                } else {
+                    inverseCacheMap.remove(record);
+                    record.hit();
+                    inverseCacheMap.put(record, key);
+                    results.put(key, record.getValue());
+                }
+            });
+        } finally {
+            readLock.unlock();
+        }
+        return results;
+    }
+
+    @Override
     public ByteBuffer remove(ByteBuffer key) throws IOException {
         writeLock.lock();
         try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/16a23f5a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java
index 2e19714..0cd2813 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java
@@ -20,6 +20,8 @@ import org.apache.nifi.distributed.cache.server.EvictionPolicy;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -89,6 +91,10 @@ public class TestSimpleMapCache {
         assertNull(putResult.getEvicted());
         assertEquals("Revision should start from 0", 0, putResult.getRecord().getRevision());
 
+        // Get multiple keys
+        Map<ByteBuffer, ByteBuffer> results = cache.subMap(Arrays.asList(key1, key2, key3));
+        assertEquals(3, results.size());
+
     }
 
     @Test