You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/06/21 16:48:08 UTC

[3/3] nifi git commit: NIFI-4043 Initial commit of nifi-redis-bundle

NIFI-4043 Initial commit of nifi-redis-bundle

NIFI-4061 Initial version of RedisStateProvider
- Adding PropertyContext and updating existing contexts to extend it
- Added embedded Redis for unit testing
- Added wrapped StateProvider with NAR ClassLoader in StandardStateManagerProvider
- Updating state-management.xml with config for RedisStateProvider
- Renaming tests that use RedisServer to be IT tests so they don't run all the time

This closes #1918.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/aabd4a25
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/aabd4a25
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/aabd4a25

Branch: refs/heads/master
Commit: aabd4a25d2cb8b2a6108be6c16315acee08ee712
Parents: 6bc6f95
Author: Bryan Bende <bb...@apache.org>
Authored: Mon Jun 12 15:53:20 2017 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jun 21 12:47:19 2017 -0400

----------------------------------------------------------------------
 .../nifi/components/ValidationContext.java      |  10 +-
 .../apache/nifi/context/PropertyContext.java    |  41 ++
 .../controller/AbstractControllerService.java   |  18 +-
 .../nifi/controller/ConfigurationContext.java   |  14 +-
 .../apache/nifi/processor/ProcessContext.java   |  12 +-
 .../apache/nifi/reporting/ReportingContext.java |  11 +-
 nifi-assembly/pom.xml                           |  10 +
 .../bootstrap/NotificationServiceManager.java   |   6 +
 .../NotificationInitializationContext.java      |  14 +-
 .../NotificationValidationContext.java          |  10 +
 .../StateProviderInitializationContext.java     |  12 +-
 .../nifi/util/MockConfigurationContext.java     |  10 +
 .../apache/nifi/util/MockProcessContext.java    |   9 +
 .../apache/nifi/util/MockReportingContext.java  |  10 +
 .../apache/nifi/util/MockValidationContext.java |  10 +
 .../reporting/StandardReportingContext.java     |  10 +
 .../scheduling/ConnectableProcessContext.java   |   5 +
 .../service/StandardConfigurationContext.java   |  10 +
 ...ndardStateProviderInitializationContext.java |  10 +
 .../manager/StandardStateManagerProvider.java   | 127 +++++-
 .../nifi/processor/StandardProcessContext.java  |  10 +
 .../processor/StandardSchedulingContext.java    |  10 +
 .../processor/StandardValidationContext.java    |  10 +
 .../controller/TestStandardProcessorNode.java   |  10 +
 .../local/TestWriteAheadLocalStateProvider.java |  10 +
 .../zookeeper/TestZooKeeperStateProvider.java   |  10 +
 .../nifi/mock/MockConfigurationContext.java     |   5 +
 .../apache/nifi/mock/MockProcessContext.java    |   5 +
 .../main/resources/conf/state-management.xml    |  68 +++
 .../nifi-redis-extensions/pom.xml               |  89 ++++
 .../service/RedisConnectionPoolService.java     |  93 ++++
 .../RedisDistributedMapCacheClientService.java  | 327 ++++++++++++++
 .../apache/nifi/redis/state/RedisStateMap.java  | 100 +++++
 .../redis/state/RedisStateMapJsonSerDe.java     |  85 ++++
 .../nifi/redis/state/RedisStateMapSerDe.java    |  44 ++
 .../nifi/redis/state/RedisStateProvider.java    | 299 +++++++++++++
 .../org/apache/nifi/redis/util/RedisAction.java |  30 ++
 .../org/apache/nifi/redis/util/RedisUtils.java  | 428 +++++++++++++++++++
 ...g.apache.nifi.components.state.StateProvider |  15 +
 ...org.apache.nifi.controller.ControllerService |  16 +
 .../nifi/redis/service/FakeRedisProcessor.java  |  53 +++
 ...ITRedisDistributedMapCacheClientService.java | 264 ++++++++++++
 .../service/TestRedisConnectionPoolService.java |  95 ++++
 .../nifi/redis/state/ITRedisStateProvider.java  | 318 ++++++++++++++
 .../redis/state/TestRedisStateMapJsonSerDe.java |  79 ++++
 .../nifi-redis-bundle/nifi-redis-nar/pom.xml    |  46 ++
 .../src/main/resources/META-INF/NOTICE          |  32 ++
 .../nifi-redis-service-api-nar/pom.xml          |  46 ++
 .../src/main/resources/META-INF/LICENSE         | 239 +++++++++++
 .../src/main/resources/META-INF/NOTICE          |  33 ++
 .../nifi-redis-service-api/pom.xml              |  44 ++
 .../apache/nifi/redis/RedisConnectionPool.java  |  44 ++
 .../java/org/apache/nifi/redis/RedisType.java   |  56 +++
 nifi-nar-bundles/nifi-redis-bundle/pom.xml      |  41 ++
 .../processors/standard/WaitNotifyProtocol.java |  10 +-
 nifi-nar-bundles/pom.xml                        |   1 +
 pom.xml                                         |  12 +
 57 files changed, 3372 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java b/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java
index 66a54fa..444d1bd 100644
--- a/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java
+++ b/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java
@@ -18,11 +18,12 @@ package org.apache.nifi.components;
 
 import java.util.Map;
 
+import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.expression.ExpressionLanguageCompiler;
 
-public interface ValidationContext {
+public interface ValidationContext extends PropertyContext {
 
     /**
      * @return the {@link ControllerServiceLookup} which can be used to obtain
@@ -44,13 +45,6 @@ public interface ValidationContext {
     ExpressionLanguageCompiler newExpressionLanguageCompiler();
 
     /**
-     * @param property being validated
-     * @return a PropertyValue that encapsulates the value configured for the
-     * given PropertyDescriptor
-     */
-    PropertyValue getProperty(PropertyDescriptor property);
-
-    /**
      * @param value to make a PropertyValue object for
      * @return a PropertyValue that represents the given value
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-api/src/main/java/org/apache/nifi/context/PropertyContext.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/context/PropertyContext.java b/nifi-api/src/main/java/org/apache/nifi/context/PropertyContext.java
new file mode 100644
index 0000000..2771927
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/context/PropertyContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.context;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+
+import java.util.Map;
+
+/**
+ * A context for retrieving a PropertyValue from a PropertyDescriptor.
+ */
+public interface PropertyContext {
+
+    /**
+     * Retrieves the current value set for the given descriptor, if a value is
+     * set - else uses the descriptor to determine the appropriate default value
+     *
+     * @param descriptor to lookup the value of
+     * @return the property value of the given descriptor
+     */
+    PropertyValue getProperty(PropertyDescriptor descriptor);
+
+
+    Map<String,String> getAllProperties();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java b/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java
index 9762f3e..95f0583 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java
@@ -23,7 +23,6 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.reporting.InitializationException;
 
 public abstract class AbstractControllerService extends AbstractConfigurableComponent implements ControllerService {
@@ -33,6 +32,7 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp
     private ComponentLog logger;
     private StateManager stateManager;
     private volatile ConfigurationContext configurationContext;
+    private volatile boolean enabled = false;
 
     @Override
     public final void initialize(final ControllerServiceInitializationContext context) throws InitializationException {
@@ -50,7 +50,7 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp
 
     /**
      * @return the {@link ControllerServiceLookup} that was passed to the
-     * {@link #init(ProcessorInitializationContext)} method
+     * {@link #init(ControllerServiceInitializationContext)} method
      */
     protected final ControllerServiceLookup getControllerServiceLookup() {
         return serviceLookup;
@@ -66,6 +66,20 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp
     protected void init(final ControllerServiceInitializationContext config) throws InitializationException {
     }
 
+    @OnEnabled
+    public final void enabled() {
+        this.enabled = true;
+    }
+
+    @OnDisabled
+    public final void disabled() {
+        this.enabled = false;
+    }
+
+    public boolean isEnabled() {
+        return this.enabled;
+    }
+
     /**
      * @return the logger that has been provided to the component by the
      * framework in its initialize method

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-api/src/main/java/org/apache/nifi/controller/ConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/ConfigurationContext.java b/nifi-api/src/main/java/org/apache/nifi/controller/ConfigurationContext.java
index 03965d4..e6b3cb2 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/ConfigurationContext.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/ConfigurationContext.java
@@ -16,23 +16,17 @@
  */
 package org.apache.nifi.controller;
 
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
-
 /**
  * This context is passed to ControllerServices and Reporting Tasks in order
  * to expose their configuration to them.
  */
-public interface ConfigurationContext {
-
-    /**
-     * @param property to retrieve by name
-     * @return the configured value for the property with the given name
-     */
-    PropertyValue getProperty(PropertyDescriptor property);
+public interface ConfigurationContext extends PropertyContext {
 
     /**
      * @return an unmodifiable map of all configured properties for this

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
index c112e8a..2bbe06a 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
@@ -22,6 +22,7 @@ import java.util.Set;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.ControllerServiceLookup;
 
 /**
@@ -34,16 +35,7 @@ import org.apache.nifi.controller.ControllerServiceLookup;
  * thread-safe.
  * </p>
  */
-public interface ProcessContext {
-
-    /**
-     * Retrieves the current value set for the given descriptor, if a value is
-     * set - else uses the descriptor to determine the appropriate default value
-     *
-     * @param descriptor to lookup the value of
-     * @return the property value of the given descriptor
-     */
-    PropertyValue getProperty(PropertyDescriptor descriptor);
+public interface ProcessContext extends PropertyContext {
 
     /**
      * Retrieves the current value set for the given descriptor, if a value is

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java
index f1acfe3..8b3ad56 100644
--- a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java
+++ b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java
@@ -17,8 +17,8 @@
 package org.apache.nifi.reporting;
 
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.ControllerServiceLookup;
 
 import java.util.Map;
@@ -29,7 +29,7 @@ import java.util.Map;
  * statistics, metrics, and monitoring information, as well as configuration
  * supplied by the user.
  */
-public interface ReportingContext {
+public interface ReportingContext extends PropertyContext {
 
     /**
      * @return a Map of all known {@link PropertyDescriptor}s to their
@@ -40,13 +40,6 @@ public interface ReportingContext {
     Map<PropertyDescriptor, String> getProperties();
 
     /**
-     * @param propertyName descriptor of property to lookup the value of
-     * @return PropertyValue that represents the user-configured value for the given
-     * {@link PropertyDescriptor}
-     */
-    PropertyValue getProperty(PropertyDescriptor propertyName);
-
-    /**
      * @return the {@link EventAccess} object that can be used to obtain
      * information about specific events and reports that have happened
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index bc9992c..8f73ec0 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -476,6 +476,16 @@
             <artifactId>nifi-hwx-schema-registry-nar</artifactId>
             <type>nar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-redis-service-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-redis-nar</artifactId>
+            <type>nar</type>
+        </dependency>
     </dependencies>
     <profiles>
         <profile>

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServiceManager.java
----------------------------------------------------------------------
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServiceManager.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServiceManager.java
index 6203a06..6e91751 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServiceManager.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServiceManager.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -379,6 +380,11 @@ public class NotificationServiceManager {
                 }
 
                 @Override
+                public Map<String,String> getAllProperties() {
+                    return Collections.unmodifiableMap(propertyValues);
+                }
+
+                @Override
                 public String getIdentifier() {
                     return serviceId;
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationInitializationContext.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationInitializationContext.java
index 88e0445..e505d0b 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationInitializationContext.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationInitializationContext.java
@@ -17,19 +17,9 @@
 package org.apache.nifi.bootstrap.notification;
 
 
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.context.PropertyContext;
 
-public interface NotificationInitializationContext {
-
-    /**
-     * Returns the configured value for the given PropertyDescriptor
-     *
-     * @param descriptor the property to fetch the value for
-     * @return the configured value for the given PropertyDescriptor, or the default value for the PropertyDescriptor
-     *         if no value has been configured.
-     */
-    PropertyValue getProperty(PropertyDescriptor descriptor);
+public interface NotificationInitializationContext extends PropertyContext {
 
     /**
      * @return the identifier for the NotificationService

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java
----------------------------------------------------------------------
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java
index 99d3b23..6d3ef53 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.bootstrap.notification;
 
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -76,6 +77,15 @@ public class NotificationValidationContext implements ValidationContext {
     }
 
     @Override
+    public Map<String, String> getAllProperties() {
+        final Map<String,String> propValueMap = new LinkedHashMap<>();
+        for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+            propValueMap.put(entry.getKey().getName(), entry.getValue());
+        }
+        return propValueMap;
+    }
+
+    @Override
     public String getAnnotationData() {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java b/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java
index 4182490..5b90a1c 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java
@@ -23,13 +23,14 @@ import javax.net.ssl.SSLContext;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.logging.ComponentLog;
 
 /**
  * This interface defines an initialization context that is passed to a {@link StateProvider} when it
  * is initialized.
  */
-public interface StateProviderInitializationContext {
+public interface StateProviderInitializationContext extends PropertyContext {
     /**
      * @return the identifier if the StateProvider
      */
@@ -41,15 +42,6 @@ public interface StateProviderInitializationContext {
     Map<PropertyDescriptor, PropertyValue> getProperties();
 
     /**
-     * Returns the configured value for the given property
-     *
-     * @param property the property to retrieve the value for
-     *
-     * @return the configured value for the property.
-     */
-    PropertyValue getProperty(PropertyDescriptor property);
-
-    /**
      * @return the SSL Context that should be used to communicate with remote resources,
      *         or <code>null</code> if no SSLContext has been configured
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-mock/src/main/java/org/apache/nifi/util/MockConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockConfigurationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockConfigurationContext.java
index 74b84ad..91d805e 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockConfigurationContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockConfigurationContext.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.util;
 
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -69,6 +70,15 @@ public class MockConfigurationContext implements ConfigurationContext {
         return new HashMap<>(this.properties);
     }
 
+    @Override
+    public Map<String, String> getAllProperties() {
+        final Map<String,String> propValueMap = new LinkedHashMap<>();
+        for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+            propValueMap.put(entry.getKey().getName(), entry.getValue());
+        }
+        return propValueMap;
+    }
+
     private PropertyDescriptor getActualDescriptor(final PropertyDescriptor property) {
         if (service == null) {
             return property;

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
index 8cbe1ac..8651241 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
@@ -212,6 +212,15 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
         }
     }
 
+    @Override
+    public Map<String, String> getAllProperties() {
+        final Map<String,String> propValueMap = new LinkedHashMap<>();
+        for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+            propValueMap.put(entry.getKey().getName(), entry.getValue());
+        }
+        return propValueMap;
+    }
+
     /**
      * Validates the current properties, returning ValidationResults for any
      * invalid properties. All processor defined properties will be validated.

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
index 26ad590..f65bc3e 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
@@ -19,6 +19,7 @@ package org.apache.nifi.util;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -59,6 +60,15 @@ public class MockReportingContext extends MockControllerServiceLookup implements
     }
 
     @Override
+    public Map<String, String> getAllProperties() {
+        final Map<String,String> propValueMap = new LinkedHashMap<>();
+        for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+            propValueMap.put(entry.getKey().getName(), entry.getValue());
+        }
+        return propValueMap;
+    }
+
+    @Override
     public PropertyValue getProperty(final PropertyDescriptor property) {
         final String configuredValue = properties.get(property);
         return new MockPropertyValue(configuredValue == null ? property.getDefaultValue() : configuredValue, this, variableRegistry);

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
index fbd2a36..564ec54 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.util;
 
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -89,6 +90,15 @@ public class MockValidationContext implements ValidationContext, ControllerServi
     }
 
     @Override
+    public Map<String, String> getAllProperties() {
+        final Map<String,String> propValueMap = new LinkedHashMap<>();
+        for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+            propValueMap.put(entry.getKey().getName(), entry.getValue());
+        }
+        return propValueMap;
+    }
+
+    @Override
     public String getAnnotationData() {
         return context.getAnnotationData();
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
index 8f8b231..62183cc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
@@ -40,6 +40,7 @@ import org.apache.nifi.reporting.Severity;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -108,6 +109,15 @@ public class StandardReportingContext implements ReportingContext, ControllerSer
     }
 
     @Override
+    public Map<String, String> getAllProperties() {
+        final Map<String,String> propValueMap = new LinkedHashMap<>();
+        for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+            propValueMap.put(entry.getKey().getName(), entry.getValue());
+        }
+        return propValueMap;
+    }
+
+    @Override
     public PropertyValue getProperty(final PropertyDescriptor property) {
         final String configuredValue = properties.get(property);
         return new StandardPropertyValue(configuredValue == null ? property.getDefaultValue() : configuredValue, this, preparedQueries.get(property), variableRegistry);

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
index 0d755b0..3116401 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
@@ -195,6 +195,11 @@ public class ConnectableProcessContext implements ProcessContext {
     }
 
     @Override
+    public Map<String, String> getAllProperties() {
+        return new HashMap<>();
+    }
+
+    @Override
     public Map<PropertyDescriptor, String> getProperties() {
         return new HashMap<>();
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java
index 61db819..c188d75 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller.service;
 
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -82,6 +83,15 @@ public class StandardConfigurationContext implements ConfigurationContext {
     }
 
     @Override
+    public Map<String, String> getAllProperties() {
+        final Map<String,String> propValueMap = new LinkedHashMap<>();
+        for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+            propValueMap.put(entry.getKey().getName(), entry.getValue());
+        }
+        return propValueMap;
+    }
+
+    @Override
     public String getSchedulingPeriod() {
         return schedulingPeriod;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java
index d86a120..ace92c7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.state;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 import javax.net.ssl.SSLContext;
@@ -47,6 +48,15 @@ public class StandardStateProviderInitializationContext implements StateProvider
     }
 
     @Override
+    public Map<String,String> getAllProperties() {
+        final Map<String,String> propValueMap = new LinkedHashMap<>();
+        for (final Map.Entry<PropertyDescriptor, PropertyValue> entry : getProperties().entrySet()) {
+            propValueMap.put(entry.getKey().getName(), entry.getValue().getValue());
+        }
+        return propValueMap;
+    }
+
+    @Override
     public PropertyValue getProperty(final PropertyDescriptor property) {
         return properties.get(property);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
index d63ae00..b365de2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
@@ -38,6 +38,7 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.components.state.StateProvider;
 import org.apache.nifi.components.state.StateProviderInitializationContext;
 import org.apache.nifi.controller.state.ConfigParseException;
@@ -48,6 +49,7 @@ import org.apache.nifi.controller.state.config.StateProviderConfiguration;
 import org.apache.nifi.framework.security.util.SslContextFactory;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.processor.SimpleProcessLogger;
 import org.apache.nifi.processor.StandardValidationContext;
 import org.apache.nifi.registry.VariableRegistry;
@@ -232,7 +234,7 @@ public class StandardStateManagerProvider implements StateManagerProvider{
 
             Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
             final Class<? extends StateProvider> mgrClass = rawClass.asSubclass(StateProvider.class);
-            return mgrClass.newInstance();
+            return withNarClassLoader(mgrClass.newInstance());
         } finally {
             if (ctxClassLoader != null) {
                 Thread.currentThread().setContextClassLoader(ctxClassLoader);
@@ -241,6 +243,129 @@ public class StandardStateManagerProvider implements StateManagerProvider{
     }
 
     /**
+     * Wrap the provider so that all method calls set the context class loader to the NAR's class loader before
+     * executing the actual provider.
+     *
+     * @param stateProvider the base provider to wrap
+     * @return the wrapped provider
+     */
+    private static StateProvider withNarClassLoader(final StateProvider stateProvider) {
+        return new StateProvider() {
+            @Override
+            public void initialize(StateProviderInitializationContext context) throws IOException {
+                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                    stateProvider.initialize(context);
+                }
+            }
+
+            @Override
+            public void shutdown() {
+                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                    stateProvider.shutdown();
+                }
+            }
+
+            @Override
+            public void setState(Map<String, String> state, String componentId) throws IOException {
+                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                    stateProvider.setState(state, componentId);
+                }
+            }
+
+            @Override
+            public StateMap getState(String componentId) throws IOException {
+                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                    return stateProvider.getState(componentId);
+                }
+            }
+
+            @Override
+            public boolean replace(StateMap oldValue, Map<String, String> newValue, String componentId) throws IOException {
+                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                    return stateProvider.replace(oldValue, newValue, componentId);
+                }
+            }
+
+            @Override
+            public void clear(String componentId) throws IOException {
+                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                    stateProvider.clear(componentId);
+                }
+            }
+
+            @Override
+            public void onComponentRemoved(String componentId) throws IOException {
+                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                    stateProvider.onComponentRemoved(componentId);
+                }
+            }
+
+            @Override
+            public void enable() {
+                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                    stateProvider.enable();
+                }
+            }
+
+            @Override
+            public void disable() {
+                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                    stateProvider.disable();
+                }
+            }
+
+            @Override
+            public boolean isEnabled() {
+                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                    return stateProvider.isEnabled();
+                }
+            }
+
+            @Override
+            public Scope[] getSupportedScopes() {
+                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                    return stateProvider.getSupportedScopes();
+                }
+            }
+
+            @Override
+            public Collection<ValidationResult> validate(ValidationContext context) {
+                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                    return stateProvider.validate(context);
+                }
+            }
+
+            @Override
+            public PropertyDescriptor getPropertyDescriptor(String name) {
+                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                    return stateProvider.getPropertyDescriptor(name);
+                }
+            }
+
+            @Override
+            public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                    stateProvider.onPropertyModified(descriptor, oldValue, newValue);
+                }
+            }
+
+            @Override
+            public List<PropertyDescriptor> getPropertyDescriptors() {
+                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                    return stateProvider.getPropertyDescriptors();
+                }
+            }
+
+            @Override
+            public String getIdentifier() {
+                try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+                    return stateProvider.getIdentifier();
+                }
+            }
+        };
+    }
+
+    /**
      * Returns the State Manager that has been created for the given component ID, or <code>null</code> if none exists
      *
      * @return the StateManager that can be used by the component with the given ID, or <code>null</code> if none exists

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
index 83906e2..2714392 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processor;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -126,6 +127,15 @@ public class StandardProcessContext implements ProcessContext, ControllerService
     }
 
     @Override
+    public Map<String, String> getAllProperties() {
+        final Map<String,String> propValueMap = new LinkedHashMap<>();
+        for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+            propValueMap.put(entry.getKey().getName(), entry.getValue());
+        }
+        return propValueMap;
+    }
+
+    @Override
     public String encrypt(final String unencrypted) {
         return encryptor.encrypt(unencrypted);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
index 86518b8..1f5cfee 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processor;
 
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -99,6 +100,15 @@ public class StandardSchedulingContext implements SchedulingContext {
     }
 
     @Override
+    public Map<String, String> getAllProperties() {
+        final Map<String,String> propValueMap = new LinkedHashMap<>();
+        for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+            propValueMap.put(entry.getKey().getName(), entry.getValue());
+        }
+        return propValueMap;
+    }
+
+    @Override
     public String encrypt(final String unencrypted) {
         return processContext.encrypt(unencrypted);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
index dfc7965..662169c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processor;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -119,6 +120,15 @@ public class StandardValidationContext implements ValidationContext {
     }
 
     @Override
+    public Map<String, String> getAllProperties() {
+        final Map<String,String> propValueMap = new LinkedHashMap<>();
+        for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+            propValueMap.put(entry.getKey().getName(), entry.getValue());
+        }
+        return propValueMap;
+    }
+
+    @Override
     public String getAnnotationData() {
         return annotationData;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
index c248d25..32b6f53 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
@@ -62,6 +62,7 @@ import java.net.URL;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
@@ -513,6 +514,15 @@ public class TestStandardProcessorNode {
                     }
 
                     @Override
+                    public Map<String, String> getAllProperties() {
+                        final Map<String,String> propValueMap = new LinkedHashMap<>();
+                        for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+                            propValueMap.put(entry.getKey().getName(), entry.getValue());
+                        }
+                        return propValueMap;
+                    }
+
+                    @Override
                     public String getAnnotationData() {
                         return null;
                     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
index d2e4a05..41b7d9d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
@@ -20,6 +20,7 @@ package org.apache.nifi.controller.state.providers.local;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
 
@@ -62,6 +63,15 @@ public class TestWriteAheadLocalStateProvider extends AbstractTestStateProvider
             }
 
             @Override
+            public Map<String,String> getAllProperties() {
+                final Map<String,String> propValueMap = new LinkedHashMap<>();
+                for (final Map.Entry<PropertyDescriptor, PropertyValue> entry : getProperties().entrySet()) {
+                    propValueMap.put(entry.getKey().getName(), entry.getValue().getValue());
+                }
+                return propValueMap;
+            }
+
+            @Override
             public PropertyValue getProperty(final PropertyDescriptor property) {
                 final PropertyValue prop = properties.get(property);
                 if (prop == null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
index d09ee1f..037b350 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
@@ -18,6 +18,7 @@ package org.apache.nifi.controller.state.providers.zookeeper;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 import javax.net.ssl.SSLContext;
@@ -76,6 +77,15 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider {
             }
 
             @Override
+            public Map<String,String> getAllProperties() {
+                final Map<String,String> propValueMap = new LinkedHashMap<>();
+                for (final Map.Entry<PropertyDescriptor, PropertyValue> entry : getProperties().entrySet()) {
+                    propValueMap.put(entry.getKey().getName(), entry.getValue().getValue());
+                }
+                return propValueMap;
+            }
+
+            @Override
             public PropertyValue getProperty(final PropertyDescriptor property) {
                 final String prop = properties.get(property);
                 return new StandardPropertyValue(prop, null);

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockConfigurationContext.java
index d1e73fb..d9a1b37 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockConfigurationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockConfigurationContext.java
@@ -32,6 +32,11 @@ public class MockConfigurationContext implements ConfigurationContext {
     }
 
     @Override
+    public Map<String, String> getAllProperties() {
+        return Collections.emptyMap();
+    }
+
+    @Override
     public Map<PropertyDescriptor, String> getProperties() {
         return Collections.emptyMap();
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java
index cf2e2cf..cb17324 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java
@@ -65,6 +65,11 @@ public class MockProcessContext implements ProcessContext {
     }
 
     @Override
+    public Map<String, String> getAllProperties() {
+        return Collections.emptyMap();
+    }
+
+    @Override
     public String encrypt(String unencrypted) {
         return unencrypted;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
index d7631c2..dcd7ee6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
@@ -63,4 +63,72 @@
         <property name="Session Timeout">10 seconds</property>
         <property name="Access Control">Open</property>
     </cluster-provider>
+
+    <!--
+        Cluster State Provider that stores state in Redis. This can be used as an alternative to the ZooKeeper State Provider.
+
+        This provider requires the following properties:
+
+            Redis Mode - The type of Redis instance:
+                            - Standalone
+                            - Sentinel
+                            - Cluster (currently not supported for state-management due to use of WATCH command which Redis does not support in clustered mode)
+
+            Connection String - The connection string for Redis.
+                        - In a standalone instance this value will be of the form hostname:port.
+                        - In a sentinel instance this value will be the comma-separated list of sentinels, such as host1:port1,host2:port2,host3:port3.
+                        - In a clustered instance this value will be the comma-separated list of cluster masters, such as host1:port,host2:port,host3:port.
+
+        This provider has the following optional properties:
+
+            Key Prefix - The prefix for each key stored by this state provider. When sharing a single Redis across multiple NiFi instances, setting a unique
+                        value for the Key Prefix will make it easier to identify which instances the keys came from (default nifi/components/).
+
+            Database Index - The database index to be used by connections created from this connection pool.
+                        See the databases property in redis.conf, by default databases 0-15 will be available.
+
+            Communication Timeout - The timeout to use when attempting to communicate with Redis.
+
+            Cluster Max Redirects - The maximum number of redirects that can be performed when clustered.
+
+            Sentinel Master - The name of the sentinel master, require when Mode is set to Sentinel.
+
+            Password - The password used to authenticate to the Redis server. See the requirepass property in redis.conf.
+
+            Pool - Max Total - The maximum number of connections that can be allocated by the pool (checked out to clients, or idle awaiting checkout).
+                        A negative value indicates that there is no limit.
+
+            Pool - Max Idle - The maximum number of idle connections that can be held in the pool, or a negative value if there is no limit.
+
+            Pool - Min Idle - The target for the minimum number of idle connections to maintain in the pool. If the configured value of Min Idle is
+                    greater than the configured value for Max Idle, then the value of Max Idle will be used instead.
+
+            Pool - Block When Exhausted - Whether or not clients should block and wait when trying to obtain a connection from the pool when the pool
+                    has no available connections. Setting this to false means an error will occur immediately when a client requests a connection and
+                    none are available.
+
+            Pool - Max Wait Time - The amount of time to wait for an available connection when Block When Exhausted is set to true.
+
+            Pool - Min Evictable Idle Time - The minimum amount of time an object may sit idle in the pool before it is eligible for eviction.
+
+            Pool - Time Between Eviction Runs - The amount of time between attempting to evict idle connections from the pool.
+
+            Pool - Num Tests Per Eviction Run - The number of connections to tests per eviction attempt. A negative value indicates to test all connections.
+
+            Pool - Test On Create - Whether or not connections should be tested upon creation (default false).
+
+            Pool - Test On Borrow - Whether or not connections should be tested upon borrowing from the pool (default false).
+
+            Pool - Test On Return - Whether or not connections should be tested upon returning to the pool (default false).
+
+            Pool - Test While Idle - Whether or not connections should be tested while idle (default true).
+
+        <cluster-provider>
+            <id>redis-provider</id>
+            <class>org.apache.nifi.redis.state.RedisStateProvider</class>
+            <property name="Redis Mode">Standalone</property>
+            <property name="Connection String">localhost:6379</property>
+        </cluster-provider>
+    -->
+
 </stateManagement>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml
new file mode 100644
index 0000000..e59220d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-redis-bundle</artifactId>
+        <version>1.4.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-redis-extensions</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <!-- Provided deps from nifi-redis-service-api -->
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-redis-service-api</artifactId>
+            <version>1.4.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.data</groupId>
+            <artifactId>spring-data-redis</artifactId>
+            <version>${spring.data.redis.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>redis.clients</groupId>
+            <artifactId>jedis</artifactId>
+            <version>2.9.0</version>
+            <scope>provided</scope>
+        </dependency>
+        <!-- End Provided deps from nifi-redis-service-api -->
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.github.kstyrc</groupId>
+            <artifactId>embedded-redis</artifactId>
+            <version>0.6</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java
new file mode 100644
index 0000000..68169f9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.service;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.redis.RedisType;
+import org.apache.nifi.redis.util.RedisUtils;
+import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
+
+import java.util.Collection;
+import java.util.List;
+
+@Tags({"redis", "cache"})
+@CapabilityDescription("A service that provides connections to Redis.")
+public class RedisConnectionPoolService extends AbstractControllerService implements RedisConnectionPool {
+
+    private volatile PropertyContext context;
+    private volatile RedisType redisType;
+    private volatile JedisConnectionFactory connectionFactory;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return RedisUtils.REDIS_CONNECTION_PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        return RedisUtils.validate(validationContext);
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        this.context = context;
+
+        final String redisMode = context.getProperty(RedisUtils.REDIS_MODE).getValue();
+        this.redisType = RedisType.fromDisplayName(redisMode);
+    }
+
+    @OnDisabled
+    public void onDisabled() {
+        if (connectionFactory != null) {
+            connectionFactory.destroy();
+            connectionFactory = null;
+            redisType = null;
+            context = null;
+        }
+    }
+
+    @Override
+    public RedisType getRedisType() {
+        return redisType;
+    }
+
+    @Override
+    public RedisConnection getConnection() {
+        if (connectionFactory == null) {
+            synchronized (this) {
+                if (connectionFactory == null) {
+                    connectionFactory = RedisUtils.createConnectionFactory(context, getLogger());
+                }
+            }
+        }
+
+        return connectionFactory.getConnection();
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
new file mode 100644
index 0000000..94b195c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.service;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.redis.RedisType;
+import org.apache.nifi.redis.util.RedisAction;
+import org.apache.nifi.util.Tuple;
+import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.core.Cursor;
+import org.springframework.data.redis.core.ScanOptions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+@Tags({ "redis", "distributed", "cache", "map" })
+@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Redis as the backing cache. This service relies on " +
+        "the WATCH, MULTI, and EXEC commands in Redis, which are not fully supported when Redis is clustered. As a result, this service " +
+        "can only be used with a Redis Connection Pool that is configured for standalone or sentinel mode. Sentinel mode can be used to " +
+        "provide high-availability configurations.")
+public class RedisDistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient<byte[]> {
+
+    public static final PropertyDescriptor REDIS_CONNECTION_POOL = new PropertyDescriptor.Builder()
+            .name("redis-connection-pool")
+            .displayName("Redis Connection Pool")
+            .identifiesControllerService(RedisConnectionPool.class)
+            .required(true)
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+    static {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(REDIS_CONNECTION_POOL);
+        PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
+    }
+
+    private volatile RedisConnectionPool redisConnectionPool;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        final RedisConnectionPool redisConnectionPool = validationContext.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
+        if (redisConnectionPool != null) {
+            final RedisType redisType = redisConnectionPool.getRedisType();
+            if (redisType != null && redisType == RedisType.CLUSTER) {
+                results.add(new ValidationResult.Builder()
+                        .subject(REDIS_CONNECTION_POOL.getDisplayName())
+                        .valid(false)
+                        .explanation(REDIS_CONNECTION_POOL.getDisplayName()
+                                + " is configured in clustered mode, and this service requires a non-clustered Redis")
+                        .build());
+            }
+        }
+
+        return results;
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        this.redisConnectionPool = context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
+    }
+
+    @OnDisabled
+    public void onDisabled() {
+        this.redisConnectionPool = null;
+    }
+
+    @Override
+    public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+        return withConnection(redisConnection -> {
+            final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer);
+            return redisConnection.setNX(kv.getKey(), kv.getValue());
+        });
+    }
+
+    @Override
+    public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer) throws IOException {
+        return withConnection(redisConnection -> {
+            final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer);
+            do {
+                // start a watch on the key and retrieve the current value
+                redisConnection.watch(kv.getKey());
+                final byte[] existingValue = redisConnection.get(kv.getKey());
+
+                // start a transaction and perform the put-if-absent
+                redisConnection.multi();
+                redisConnection.setNX(kv.getKey(), kv.getValue());
+
+                // execute the transaction
+                final List<Object> results = redisConnection.exec();
+
+                // if the results list was empty, then the transaction failed (i.e. key was modified after we started watching), so keep looping to retry
+                // if the results list has results, then the transaction succeeded and it should have the result of the setNX operation
+                if (results.size() > 0) {
+                    final Object firstResult = results.get(0);
+                    if (firstResult instanceof Boolean) {
+                        final Boolean absent = (Boolean) firstResult;
+                        return absent ? null : valueDeserializer.deserialize(existingValue);
+                    } else {
+                        // this shouldn't really happen, but just in case there is a non-boolean result then bounce out of the loop
+                        throw new IOException("Unexpected result from Redis transaction: Expected Boolean result, but got "
+                                + firstResult.getClass().getName() + " with value " + firstResult.toString());
+                    }
+                }
+            } while (isEnabled());
+
+            return null;
+        });
+    }
+
+
+    @Override
+    public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
+        return withConnection(redisConnection -> {
+            final byte[] k = serialize(key, keySerializer);
+            return redisConnection.exists(k);
+        });
+    }
+
+    @Override
+    public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+        withConnection(redisConnection -> {
+            final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer);
+            redisConnection.set(kv.getKey(), kv.getValue());
+            return null;
+        });
+    }
+
+    @Override
+    public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
+        return withConnection(redisConnection -> {
+            final byte[] k = serialize(key, keySerializer);
+            final byte[] v = redisConnection.get(k);
+            return valueDeserializer.deserialize(v);
+        });
+    }
+
+    @Override
+    public void close() throws IOException {
+        // nothing to do
+    }
+
+    @Override
+    public <K> boolean remove(final K key, final Serializer<K> keySerializer) throws IOException {
+        return withConnection(redisConnection -> {
+            final byte[] k = serialize(key, keySerializer);
+            final long numRemoved = redisConnection.del(k);
+            return numRemoved > 0;
+        });
+    }
+
+    @Override
+    public long removeByPattern(final String regex) throws IOException {
+        return withConnection(redisConnection -> {
+            long deletedCount = 0;
+            final List<byte[]> batchKeys = new ArrayList<>();
+
+            // delete keys in batches of 1000 using the cursor
+            final Cursor<byte[]> cursor = redisConnection.scan(ScanOptions.scanOptions().count(100).match(regex).build());
+            while (cursor.hasNext()) {
+                batchKeys.add(cursor.next());
+
+                if (batchKeys.size() == 1000) {
+                    deletedCount += redisConnection.del(getKeys(batchKeys));
+                    batchKeys.clear();
+                }
+            }
+
+            // delete any left-over keys if some were added to the batch but never reached 1000
+            if (batchKeys.size() > 0) {
+                deletedCount += redisConnection.del(getKeys(batchKeys));
+                batchKeys.clear();
+            }
+
+            return deletedCount;
+        });
+    }
+
+    /**
+     *  Convert the list of all keys to an array.
+     */
+    private byte[][] getKeys(final List<byte[]> keys) {
+        final byte[][] allKeysArray = new byte[keys.size()][];
+        for (int i=0; i < keys.size(); i++) {
+            allKeysArray[i] = keys.get(i);
+        }
+        return  allKeysArray;
+    }
+
+    // ----------------- Methods from AtomicDistributedMapCacheClient ------------------------
+
+    @Override
+    public <K, V> AtomicCacheEntry<K, V, byte[]> fetch(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
+        return withConnection(redisConnection -> {
+            final byte[] k = serialize(key, keySerializer);
+
+            final byte[] v = redisConnection.get(k);
+            if (v == null) {
+                return null;
+            }
+
+            // for Redis we are going to use the raw bytes of the value as the revision
+            return new AtomicCacheEntry<>(key, valueDeserializer.deserialize(v), v);
+        });
+    }
+
+    @Override
+    public <K, V> boolean replace(final AtomicCacheEntry<K, V, byte[]> entry, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+        return withConnection(redisConnection -> {
+            final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+            keySerializer.serialize(entry.getKey(), out);
+            final byte[] k = out.toByteArray();
+            out.reset();
+
+            valueSerializer.serialize(entry.getValue(), out);
+            final byte[] newVal = out.toByteArray();
+
+            // the revision of the cache entry holds the value of the key from a previous fetch
+            final byte[] prevVal = entry.getRevision().orElse(null);
+
+            boolean replaced = false;
+
+            // start a watch on the key and retrieve the current value
+            redisConnection.watch(k);
+            final byte[] currValue = redisConnection.get(k);
+
+            // start a transaction
+            redisConnection.multi();
+
+            // compare-and-set
+            if (Arrays.equals(prevVal, currValue)) {
+                // if we use set(k, newVal) then the results list will always have size == 0 b/c when convertPipelineAndTxResults is set to true,
+                // status responses like "OK" are skipped over, so by using getSet we can rely on the results list to know if the transaction succeeded
+                redisConnection.getSet(k, newVal);
+            }
+
+            // execute the transaction
+            final List<Object> results = redisConnection.exec();
+
+            // if we have a result then the replace succeeded
+            if (results.size() > 0) {
+                replaced = true;
+            }
+
+            return replaced;
+        });
+    }
+
+    // ----------------- END Methods from AtomicDistributedMapCacheClient ------------------------
+
+    private <K, V> Tuple<byte[],byte[]> serialize(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+        keySerializer.serialize(key, out);
+        final byte[] k = out.toByteArray();
+
+        out.reset();
+
+        valueSerializer.serialize(value, out);
+        final byte[] v = out.toByteArray();
+
+        return new Tuple<>(k, v);
+    }
+
+    private <K> byte[] serialize(final K key, final Serializer<K> keySerializer) throws IOException {
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+        keySerializer.serialize(key, out);
+        return out.toByteArray();
+    }
+
+    private <T> T withConnection(final RedisAction<T> action) throws IOException {
+        RedisConnection redisConnection = null;
+        try {
+            redisConnection = redisConnectionPool.getConnection();
+            return action.execute(redisConnection);
+        } finally {
+           if (redisConnection != null) {
+               try {
+                   redisConnection.close();
+               } catch (Exception e) {
+                   getLogger().warn("Error closing connection: " + e.getMessage(), e);
+               }
+           }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMap.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMap.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMap.java
new file mode 100644
index 0000000..068b7a6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMap.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.state;
+
+import org.apache.nifi.components.state.StateMap;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * A StateMap implementation for RedisStateProvider.
+ */
+public class RedisStateMap implements StateMap {
+
+    public static final Long DEFAULT_VERSION = new Long(-1);
+    public static final Integer DEFAULT_ENCODING = new Integer(1);
+
+    private final Long version;
+    private final Integer encodingVersion;
+    private final Map<String,String> stateValues;
+
+    private RedisStateMap(final Builder builder) {
+        this.version = builder.version == null ? DEFAULT_VERSION : builder.version;
+        this.encodingVersion = builder.encodingVersion == null ? DEFAULT_ENCODING : builder.encodingVersion;
+        this.stateValues = Collections.unmodifiableMap(new TreeMap<>(builder.stateValues));
+        Objects.requireNonNull(version, "Version must be non-null");
+        Objects.requireNonNull(encodingVersion, "Encoding Version must be non-null");
+        Objects.requireNonNull(stateValues, "State Values must be non-null");
+    }
+
+    @Override
+    public long getVersion() {
+        return version;
+    }
+
+    @Override
+    public String get(String key) {
+        return stateValues.get(key);
+    }
+
+    @Override
+    public Map<String, String> toMap() {
+        return stateValues;
+    }
+
+    public Integer getEncodingVersion() {
+        return encodingVersion;
+    }
+
+    public static class Builder {
+
+        private Long version;
+        private Integer encodingVersion;
+        private Map<String,String> stateValues = new TreeMap<>();
+
+        public Builder version(final Long version) {
+            this.version = version;
+            return this;
+        }
+
+        public Builder encodingVersion(final Integer encodingVersion) {
+            this.encodingVersion = encodingVersion;
+            return this;
+        }
+
+        public Builder stateValue(final String name, String value) {
+            stateValues.put(name, value);
+            return this;
+        }
+
+        public Builder stateValues(final Map<String,String> stateValues) {
+            this.stateValues.clear();
+            if (stateValues != null) {
+                this.stateValues.putAll(stateValues);
+            }
+            return this;
+        }
+
+        public RedisStateMap build() {
+            return new RedisStateMap(this);
+        }
+    }
+
+}