You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/06/21 16:48:08 UTC
[3/3] nifi git commit: NIFI-4043 Initial commit of nifi-redis-bundle
NIFI-4043 Initial commit of nifi-redis-bundle
NIFI-4061 Initial version of RedisStateProvider
- Adding PropertyContext and updating existing contexts to extend it
- Added embedded Redis for unit testing
- Added wrapped StateProvider with NAR ClassLoader in StandardStateManagerProvider
- Updating state-management.xml with config for RedisStateProvider
- Renaming tests that use RedisServer to be IT tests so they don't run all the time
This closes #1918.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/aabd4a25
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/aabd4a25
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/aabd4a25
Branch: refs/heads/master
Commit: aabd4a25d2cb8b2a6108be6c16315acee08ee712
Parents: 6bc6f95
Author: Bryan Bende <bb...@apache.org>
Authored: Mon Jun 12 15:53:20 2017 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jun 21 12:47:19 2017 -0400
----------------------------------------------------------------------
.../nifi/components/ValidationContext.java | 10 +-
.../apache/nifi/context/PropertyContext.java | 41 ++
.../controller/AbstractControllerService.java | 18 +-
.../nifi/controller/ConfigurationContext.java | 14 +-
.../apache/nifi/processor/ProcessContext.java | 12 +-
.../apache/nifi/reporting/ReportingContext.java | 11 +-
nifi-assembly/pom.xml | 10 +
.../bootstrap/NotificationServiceManager.java | 6 +
.../NotificationInitializationContext.java | 14 +-
.../NotificationValidationContext.java | 10 +
.../StateProviderInitializationContext.java | 12 +-
.../nifi/util/MockConfigurationContext.java | 10 +
.../apache/nifi/util/MockProcessContext.java | 9 +
.../apache/nifi/util/MockReportingContext.java | 10 +
.../apache/nifi/util/MockValidationContext.java | 10 +
.../reporting/StandardReportingContext.java | 10 +
.../scheduling/ConnectableProcessContext.java | 5 +
.../service/StandardConfigurationContext.java | 10 +
...ndardStateProviderInitializationContext.java | 10 +
.../manager/StandardStateManagerProvider.java | 127 +++++-
.../nifi/processor/StandardProcessContext.java | 10 +
.../processor/StandardSchedulingContext.java | 10 +
.../processor/StandardValidationContext.java | 10 +
.../controller/TestStandardProcessorNode.java | 10 +
.../local/TestWriteAheadLocalStateProvider.java | 10 +
.../zookeeper/TestZooKeeperStateProvider.java | 10 +
.../nifi/mock/MockConfigurationContext.java | 5 +
.../apache/nifi/mock/MockProcessContext.java | 5 +
.../main/resources/conf/state-management.xml | 68 +++
.../nifi-redis-extensions/pom.xml | 89 ++++
.../service/RedisConnectionPoolService.java | 93 ++++
.../RedisDistributedMapCacheClientService.java | 327 ++++++++++++++
.../apache/nifi/redis/state/RedisStateMap.java | 100 +++++
.../redis/state/RedisStateMapJsonSerDe.java | 85 ++++
.../nifi/redis/state/RedisStateMapSerDe.java | 44 ++
.../nifi/redis/state/RedisStateProvider.java | 299 +++++++++++++
.../org/apache/nifi/redis/util/RedisAction.java | 30 ++
.../org/apache/nifi/redis/util/RedisUtils.java | 428 +++++++++++++++++++
...g.apache.nifi.components.state.StateProvider | 15 +
...org.apache.nifi.controller.ControllerService | 16 +
.../nifi/redis/service/FakeRedisProcessor.java | 53 +++
...ITRedisDistributedMapCacheClientService.java | 264 ++++++++++++
.../service/TestRedisConnectionPoolService.java | 95 ++++
.../nifi/redis/state/ITRedisStateProvider.java | 318 ++++++++++++++
.../redis/state/TestRedisStateMapJsonSerDe.java | 79 ++++
.../nifi-redis-bundle/nifi-redis-nar/pom.xml | 46 ++
.../src/main/resources/META-INF/NOTICE | 32 ++
.../nifi-redis-service-api-nar/pom.xml | 46 ++
.../src/main/resources/META-INF/LICENSE | 239 +++++++++++
.../src/main/resources/META-INF/NOTICE | 33 ++
.../nifi-redis-service-api/pom.xml | 44 ++
.../apache/nifi/redis/RedisConnectionPool.java | 44 ++
.../java/org/apache/nifi/redis/RedisType.java | 56 +++
nifi-nar-bundles/nifi-redis-bundle/pom.xml | 41 ++
.../processors/standard/WaitNotifyProtocol.java | 10 +-
nifi-nar-bundles/pom.xml | 1 +
pom.xml | 12 +
57 files changed, 3372 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java b/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java
index 66a54fa..444d1bd 100644
--- a/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java
+++ b/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java
@@ -18,11 +18,12 @@ package org.apache.nifi.components;
import java.util.Map;
+import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.expression.ExpressionLanguageCompiler;
-public interface ValidationContext {
+public interface ValidationContext extends PropertyContext {
/**
* @return the {@link ControllerServiceLookup} which can be used to obtain
@@ -44,13 +45,6 @@ public interface ValidationContext {
ExpressionLanguageCompiler newExpressionLanguageCompiler();
/**
- * @param property being validated
- * @return a PropertyValue that encapsulates the value configured for the
- * given PropertyDescriptor
- */
- PropertyValue getProperty(PropertyDescriptor property);
-
- /**
* @param value to make a PropertyValue object for
* @return a PropertyValue that represents the given value
*/
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-api/src/main/java/org/apache/nifi/context/PropertyContext.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/context/PropertyContext.java b/nifi-api/src/main/java/org/apache/nifi/context/PropertyContext.java
new file mode 100644
index 0000000..2771927
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/context/PropertyContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.context;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+
+import java.util.Map;
+
+/**
+ * A context for retrieving a PropertyValue from a PropertyDescriptor.
+ */
+public interface PropertyContext {
+
+ /**
+ * Retrieves the current value set for the given descriptor, if a value is
+ * set - else uses the descriptor to determine the appropriate default value
+ *
+ * @param descriptor to lookup the value of
+ * @return the property value of the given descriptor
+ */
+ PropertyValue getProperty(PropertyDescriptor descriptor);
+
+
+ Map<String,String> getAllProperties();
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java b/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java
index 9762f3e..95f0583 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java
@@ -23,7 +23,6 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.reporting.InitializationException;
public abstract class AbstractControllerService extends AbstractConfigurableComponent implements ControllerService {
@@ -33,6 +32,7 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp
private ComponentLog logger;
private StateManager stateManager;
private volatile ConfigurationContext configurationContext;
+ private volatile boolean enabled = false;
@Override
public final void initialize(final ControllerServiceInitializationContext context) throws InitializationException {
@@ -50,7 +50,7 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp
/**
* @return the {@link ControllerServiceLookup} that was passed to the
- * {@link #init(ProcessorInitializationContext)} method
+ * {@link #init(ControllerServiceInitializationContext)} method
*/
protected final ControllerServiceLookup getControllerServiceLookup() {
return serviceLookup;
@@ -66,6 +66,20 @@ public abstract class AbstractControllerService extends AbstractConfigurableComp
protected void init(final ControllerServiceInitializationContext config) throws InitializationException {
}
+ @OnEnabled
+ public final void enabled() {
+ this.enabled = true;
+ }
+
+ @OnDisabled
+ public final void disabled() {
+ this.enabled = false;
+ }
+
+ public boolean isEnabled() {
+ return this.enabled;
+ }
+
/**
* @return the logger that has been provided to the component by the
* framework in its initialize method
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-api/src/main/java/org/apache/nifi/controller/ConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/ConfigurationContext.java b/nifi-api/src/main/java/org/apache/nifi/controller/ConfigurationContext.java
index 03965d4..e6b3cb2 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/ConfigurationContext.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/ConfigurationContext.java
@@ -16,23 +16,17 @@
*/
package org.apache.nifi.controller;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
-
/**
* This context is passed to ControllerServices and Reporting Tasks in order
* to expose their configuration to them.
*/
-public interface ConfigurationContext {
-
- /**
- * @param property to retrieve by name
- * @return the configured value for the property with the given name
- */
- PropertyValue getProperty(PropertyDescriptor property);
+public interface ConfigurationContext extends PropertyContext {
/**
* @return an unmodifiable map of all configured properties for this
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
index c112e8a..2bbe06a 100644
--- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
+++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java
@@ -22,6 +22,7 @@ import java.util.Set;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ControllerServiceLookup;
/**
@@ -34,16 +35,7 @@ import org.apache.nifi.controller.ControllerServiceLookup;
* thread-safe.
* </p>
*/
-public interface ProcessContext {
-
- /**
- * Retrieves the current value set for the given descriptor, if a value is
- * set - else uses the descriptor to determine the appropriate default value
- *
- * @param descriptor to lookup the value of
- * @return the property value of the given descriptor
- */
- PropertyValue getProperty(PropertyDescriptor descriptor);
+public interface ProcessContext extends PropertyContext {
/**
* Retrieves the current value set for the given descriptor, if a value is
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java
index f1acfe3..8b3ad56 100644
--- a/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java
+++ b/nifi-api/src/main/java/org/apache/nifi/reporting/ReportingContext.java
@@ -17,8 +17,8 @@
package org.apache.nifi.reporting;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ControllerServiceLookup;
import java.util.Map;
@@ -29,7 +29,7 @@ import java.util.Map;
* statistics, metrics, and monitoring information, as well as configuration
* supplied by the user.
*/
-public interface ReportingContext {
+public interface ReportingContext extends PropertyContext {
/**
* @return a Map of all known {@link PropertyDescriptor}s to their
@@ -40,13 +40,6 @@ public interface ReportingContext {
Map<PropertyDescriptor, String> getProperties();
/**
- * @param propertyName descriptor of property to lookup the value of
- * @return PropertyValue that represents the user-configured value for the given
- * {@link PropertyDescriptor}
- */
- PropertyValue getProperty(PropertyDescriptor propertyName);
-
- /**
* @return the {@link EventAccess} object that can be used to obtain
* information about specific events and reports that have happened
*/
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index bc9992c..8f73ec0 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -476,6 +476,16 @@
<artifactId>nifi-hwx-schema-registry-nar</artifactId>
<type>nar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-redis-service-api-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-redis-nar</artifactId>
+ <type>nar</type>
+ </dependency>
</dependencies>
<profiles>
<profile>
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServiceManager.java
----------------------------------------------------------------------
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServiceManager.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServiceManager.java
index 6203a06..6e91751 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServiceManager.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/NotificationServiceManager.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -379,6 +380,11 @@ public class NotificationServiceManager {
}
@Override
+ public Map<String,String> getAllProperties() {
+ return Collections.unmodifiableMap(propertyValues);
+ }
+
+ @Override
public String getIdentifier() {
return serviceId;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationInitializationContext.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationInitializationContext.java
index 88e0445..e505d0b 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationInitializationContext.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationInitializationContext.java
@@ -17,19 +17,9 @@
package org.apache.nifi.bootstrap.notification;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.context.PropertyContext;
-public interface NotificationInitializationContext {
-
- /**
- * Returns the configured value for the given PropertyDescriptor
- *
- * @param descriptor the property to fetch the value for
- * @return the configured value for the given PropertyDescriptor, or the default value for the PropertyDescriptor
- * if no value has been configured.
- */
- PropertyValue getProperty(PropertyDescriptor descriptor);
+public interface NotificationInitializationContext extends PropertyContext {
/**
* @return the identifier for the NotificationService
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java
----------------------------------------------------------------------
diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java
index 99d3b23..6d3ef53 100644
--- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java
+++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/notification/NotificationValidationContext.java
@@ -17,6 +17,7 @@
package org.apache.nifi.bootstrap.notification;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -76,6 +77,15 @@ public class NotificationValidationContext implements ValidationContext {
}
@Override
+ public Map<String, String> getAllProperties() {
+ final Map<String,String> propValueMap = new LinkedHashMap<>();
+ for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+ propValueMap.put(entry.getKey().getName(), entry.getValue());
+ }
+ return propValueMap;
+ }
+
+ @Override
public String getAnnotationData() {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java b/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java
index 4182490..5b90a1c 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/components/state/StateProviderInitializationContext.java
@@ -23,13 +23,14 @@ import javax.net.ssl.SSLContext;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.logging.ComponentLog;
/**
* This interface defines an initialization context that is passed to a {@link StateProvider} when it
* is initialized.
*/
-public interface StateProviderInitializationContext {
+public interface StateProviderInitializationContext extends PropertyContext {
/**
* @return the identifier if the StateProvider
*/
@@ -41,15 +42,6 @@ public interface StateProviderInitializationContext {
Map<PropertyDescriptor, PropertyValue> getProperties();
/**
- * Returns the configured value for the given property
- *
- * @param property the property to retrieve the value for
- *
- * @return the configured value for the property.
- */
- PropertyValue getProperty(PropertyDescriptor property);
-
- /**
* @return the SSL Context that should be used to communicate with remote resources,
* or <code>null</code> if no SSLContext has been configured
*/
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-mock/src/main/java/org/apache/nifi/util/MockConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockConfigurationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockConfigurationContext.java
index 74b84ad..91d805e 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockConfigurationContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockConfigurationContext.java
@@ -17,6 +17,7 @@
package org.apache.nifi.util;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -69,6 +70,15 @@ public class MockConfigurationContext implements ConfigurationContext {
return new HashMap<>(this.properties);
}
+ @Override
+ public Map<String, String> getAllProperties() {
+ final Map<String,String> propValueMap = new LinkedHashMap<>();
+ for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+ propValueMap.put(entry.getKey().getName(), entry.getValue());
+ }
+ return propValueMap;
+ }
+
private PropertyDescriptor getActualDescriptor(final PropertyDescriptor property) {
if (service == null) {
return property;
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
index 8cbe1ac..8651241 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java
@@ -212,6 +212,15 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
}
}
+ @Override
+ public Map<String, String> getAllProperties() {
+ final Map<String,String> propValueMap = new LinkedHashMap<>();
+ for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+ propValueMap.put(entry.getKey().getName(), entry.getValue());
+ }
+ return propValueMap;
+ }
+
/**
* Validates the current properties, returning ValidationResults for any
* invalid properties. All processor defined properties will be validated.
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
index 26ad590..f65bc3e 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockReportingContext.java
@@ -19,6 +19,7 @@ package org.apache.nifi.util;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -59,6 +60,15 @@ public class MockReportingContext extends MockControllerServiceLookup implements
}
@Override
+ public Map<String, String> getAllProperties() {
+ final Map<String,String> propValueMap = new LinkedHashMap<>();
+ for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+ propValueMap.put(entry.getKey().getName(), entry.getValue());
+ }
+ return propValueMap;
+ }
+
+ @Override
public PropertyValue getProperty(final PropertyDescriptor property) {
final String configuredValue = properties.get(property);
return new MockPropertyValue(configuredValue == null ? property.getDefaultValue() : configuredValue, this, variableRegistry);
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
index fbd2a36..564ec54 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockValidationContext.java
@@ -17,6 +17,7 @@
package org.apache.nifi.util;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -89,6 +90,15 @@ public class MockValidationContext implements ValidationContext, ControllerServi
}
@Override
+ public Map<String, String> getAllProperties() {
+ final Map<String,String> propValueMap = new LinkedHashMap<>();
+ for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+ propValueMap.put(entry.getKey().getName(), entry.getValue());
+ }
+ return propValueMap;
+ }
+
+ @Override
public String getAnnotationData() {
return context.getAnnotationData();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
index 8f8b231..62183cc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
@@ -40,6 +40,7 @@ import org.apache.nifi.reporting.Severity;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
@@ -108,6 +109,15 @@ public class StandardReportingContext implements ReportingContext, ControllerSer
}
@Override
+ public Map<String, String> getAllProperties() {
+ final Map<String,String> propValueMap = new LinkedHashMap<>();
+ for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+ propValueMap.put(entry.getKey().getName(), entry.getValue());
+ }
+ return propValueMap;
+ }
+
+ @Override
public PropertyValue getProperty(final PropertyDescriptor property) {
final String configuredValue = properties.get(property);
return new StandardPropertyValue(configuredValue == null ? property.getDefaultValue() : configuredValue, this, preparedQueries.get(property), variableRegistry);
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
index 0d755b0..3116401 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
@@ -195,6 +195,11 @@ public class ConnectableProcessContext implements ProcessContext {
}
@Override
+ public Map<String, String> getAllProperties() {
+ return new HashMap<>();
+ }
+
+ @Override
public Map<PropertyDescriptor, String> getProperties() {
return new HashMap<>();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java
index 61db819..c188d75 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardConfigurationContext.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller.service;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -82,6 +83,15 @@ public class StandardConfigurationContext implements ConfigurationContext {
}
@Override
+ public Map<String, String> getAllProperties() {
+ final Map<String,String> propValueMap = new LinkedHashMap<>();
+ for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+ propValueMap.put(entry.getKey().getName(), entry.getValue());
+ }
+ return propValueMap;
+ }
+
+ @Override
public String getSchedulingPeriod() {
return schedulingPeriod;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java
index d86a120..ace92c7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.state;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import javax.net.ssl.SSLContext;
@@ -47,6 +48,15 @@ public class StandardStateProviderInitializationContext implements StateProvider
}
@Override
+ public Map<String,String> getAllProperties() {
+ final Map<String,String> propValueMap = new LinkedHashMap<>();
+ for (final Map.Entry<PropertyDescriptor, PropertyValue> entry : getProperties().entrySet()) {
+ propValueMap.put(entry.getKey().getName(), entry.getValue().getValue());
+ }
+ return propValueMap;
+ }
+
+ @Override
public PropertyValue getProperty(final PropertyDescriptor property) {
return properties.get(property);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
index d63ae00..b365de2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
@@ -38,6 +38,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StateProvider;
import org.apache.nifi.components.state.StateProviderInitializationContext;
import org.apache.nifi.controller.state.ConfigParseException;
@@ -48,6 +49,7 @@ import org.apache.nifi.controller.state.config.StateProviderConfiguration;
import org.apache.nifi.framework.security.util.SslContextFactory;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardValidationContext;
import org.apache.nifi.registry.VariableRegistry;
@@ -232,7 +234,7 @@ public class StandardStateManagerProvider implements StateManagerProvider{
Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
final Class<? extends StateProvider> mgrClass = rawClass.asSubclass(StateProvider.class);
- return mgrClass.newInstance();
+ return withNarClassLoader(mgrClass.newInstance());
} finally {
if (ctxClassLoader != null) {
Thread.currentThread().setContextClassLoader(ctxClassLoader);
@@ -241,6 +243,129 @@ public class StandardStateManagerProvider implements StateManagerProvider{
}
/**
+ * Wrap the provider so that all method calls set the context class loader to the NAR's class loader before
+ * executing the actual provider.
+ *
+ * @param stateProvider the base provider to wrap
+ * @return the wrapped provider
+ */
+ private static StateProvider withNarClassLoader(final StateProvider stateProvider) {
+ return new StateProvider() {
+ @Override
+ public void initialize(StateProviderInitializationContext context) throws IOException {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ stateProvider.initialize(context);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ stateProvider.shutdown();
+ }
+ }
+
+ @Override
+ public void setState(Map<String, String> state, String componentId) throws IOException {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ stateProvider.setState(state, componentId);
+ }
+ }
+
+ @Override
+ public StateMap getState(String componentId) throws IOException {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ return stateProvider.getState(componentId);
+ }
+ }
+
+ @Override
+ public boolean replace(StateMap oldValue, Map<String, String> newValue, String componentId) throws IOException {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ return stateProvider.replace(oldValue, newValue, componentId);
+ }
+ }
+
+ @Override
+ public void clear(String componentId) throws IOException {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ stateProvider.clear(componentId);
+ }
+ }
+
+ @Override
+ public void onComponentRemoved(String componentId) throws IOException {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ stateProvider.onComponentRemoved(componentId);
+ }
+ }
+
+ @Override
+ public void enable() {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ stateProvider.enable();
+ }
+ }
+
+ @Override
+ public void disable() {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ stateProvider.disable();
+ }
+ }
+
+ @Override
+ public boolean isEnabled() {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ return stateProvider.isEnabled();
+ }
+ }
+
+ @Override
+ public Scope[] getSupportedScopes() {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ return stateProvider.getSupportedScopes();
+ }
+ }
+
+ @Override
+ public Collection<ValidationResult> validate(ValidationContext context) {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ return stateProvider.validate(context);
+ }
+ }
+
+ @Override
+ public PropertyDescriptor getPropertyDescriptor(String name) {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ return stateProvider.getPropertyDescriptor(name);
+ }
+ }
+
+ @Override
+ public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ stateProvider.onPropertyModified(descriptor, oldValue, newValue);
+ }
+ }
+
+ @Override
+ public List<PropertyDescriptor> getPropertyDescriptors() {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ return stateProvider.getPropertyDescriptors();
+ }
+ }
+
+ @Override
+ public String getIdentifier() {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ return stateProvider.getIdentifier();
+ }
+ }
+ };
+ }
+
+ /**
* Returns the State Manager that has been created for the given component ID, or <code>null</code> if none exists
*
* @return the StateManager that can be used by the component with the given ID, or <code>null</code> if none exists
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
index 83906e2..2714392 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processor;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -126,6 +127,15 @@ public class StandardProcessContext implements ProcessContext, ControllerService
}
@Override
+ public Map<String, String> getAllProperties() {
+ final Map<String,String> propValueMap = new LinkedHashMap<>();
+ for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+ propValueMap.put(entry.getKey().getName(), entry.getValue());
+ }
+ return propValueMap;
+ }
+
+ @Override
public String encrypt(final String unencrypted) {
return encryptor.encrypt(unencrypted);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
index 86518b8..1f5cfee 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processor;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -99,6 +100,15 @@ public class StandardSchedulingContext implements SchedulingContext {
}
@Override
+ public Map<String, String> getAllProperties() {
+ final Map<String,String> propValueMap = new LinkedHashMap<>();
+ for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+ propValueMap.put(entry.getKey().getName(), entry.getValue());
+ }
+ return propValueMap;
+ }
+
+ @Override
public String encrypt(final String unencrypted) {
return processContext.encrypt(unencrypted);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
index dfc7965..662169c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardValidationContext.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processor;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -119,6 +120,15 @@ public class StandardValidationContext implements ValidationContext {
}
@Override
+ public Map<String, String> getAllProperties() {
+ final Map<String,String> propValueMap = new LinkedHashMap<>();
+ for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+ propValueMap.put(entry.getKey().getName(), entry.getValue());
+ }
+ return propValueMap;
+ }
+
+ @Override
public String getAnnotationData() {
return annotationData;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
index c248d25..32b6f53 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardProcessorNode.java
@@ -62,6 +62,7 @@ import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
@@ -513,6 +514,15 @@ public class TestStandardProcessorNode {
}
@Override
+ public Map<String, String> getAllProperties() {
+ final Map<String,String> propValueMap = new LinkedHashMap<>();
+ for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
+ propValueMap.put(entry.getKey().getName(), entry.getValue());
+ }
+ return propValueMap;
+ }
+
+ @Override
public String getAnnotationData() {
return null;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
index d2e4a05..41b7d9d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/local/TestWriteAheadLocalStateProvider.java
@@ -20,6 +20,7 @@ package org.apache.nifi.controller.state.providers.local;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
@@ -62,6 +63,15 @@ public class TestWriteAheadLocalStateProvider extends AbstractTestStateProvider
}
@Override
+ public Map<String,String> getAllProperties() {
+ final Map<String,String> propValueMap = new LinkedHashMap<>();
+ for (final Map.Entry<PropertyDescriptor, PropertyValue> entry : getProperties().entrySet()) {
+ propValueMap.put(entry.getKey().getName(), entry.getValue().getValue());
+ }
+ return propValueMap;
+ }
+
+ @Override
public PropertyValue getProperty(final PropertyDescriptor property) {
final PropertyValue prop = properties.get(property);
if (prop == null) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
index d09ee1f..037b350 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
@@ -18,6 +18,7 @@ package org.apache.nifi.controller.state.providers.zookeeper;
import java.io.IOException;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.Map;
import javax.net.ssl.SSLContext;
@@ -76,6 +77,15 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider {
}
@Override
+ public Map<String,String> getAllProperties() {
+ final Map<String,String> propValueMap = new LinkedHashMap<>();
+ for (final Map.Entry<PropertyDescriptor, PropertyValue> entry : getProperties().entrySet()) {
+ propValueMap.put(entry.getKey().getName(), entry.getValue().getValue());
+ }
+ return propValueMap;
+ }
+
+ @Override
public PropertyValue getProperty(final PropertyDescriptor property) {
final String prop = properties.get(property);
return new StandardPropertyValue(prop, null);
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockConfigurationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockConfigurationContext.java
index d1e73fb..d9a1b37 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockConfigurationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockConfigurationContext.java
@@ -32,6 +32,11 @@ public class MockConfigurationContext implements ConfigurationContext {
}
@Override
+ public Map<String, String> getAllProperties() {
+ return Collections.emptyMap();
+ }
+
+ @Override
public Map<PropertyDescriptor, String> getProperties() {
return Collections.emptyMap();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java
index cf2e2cf..cb17324 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-nar-utils/src/main/java/org/apache/nifi/mock/MockProcessContext.java
@@ -65,6 +65,11 @@ public class MockProcessContext implements ProcessContext {
}
@Override
+ public Map<String, String> getAllProperties() {
+ return Collections.emptyMap();
+ }
+
+ @Override
public String encrypt(String unencrypted) {
return unencrypted;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
index d7631c2..dcd7ee6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml
@@ -63,4 +63,72 @@
<property name="Session Timeout">10 seconds</property>
<property name="Access Control">Open</property>
</cluster-provider>
+
+ <!--
+ Cluster State Provider that stores state in Redis. This can be used as an alternative to the ZooKeeper State Provider.
+
+ This provider requires the following properties:
+
+ Redis Mode - The type of Redis instance:
+ - Standalone
+ - Sentinel
+ - Cluster (currently not supported for state-management due to use of WATCH command which Redis does not support in clustered mode)
+
+ Connection String - The connection string for Redis.
+ - In a standalone instance this value will be of the form hostname:port.
+ - In a sentinel instance this value will be the comma-separated list of sentinels, such as host1:port1,host2:port2,host3:port3.
+ - In a clustered instance this value will be the comma-separated list of cluster masters, such as host1:port,host2:port,host3:port.
+
+ This provider has the following optional properties:
+
+ Key Prefix - The prefix for each key stored by this state provider. When sharing a single Redis across multiple NiFi instances, setting a unique
+ value for the Key Prefix will make it easier to identify which instances the keys came from (default nifi/components/).
+
+ Database Index - The database index to be used by connections created from this connection pool.
+ See the databases property in redis.conf, by default databases 0-15 will be available.
+
+ Communication Timeout - The timeout to use when attempting to communicate with Redis.
+
+ Cluster Max Redirects - The maximum number of redirects that can be performed when clustered.
+
+ Sentinel Master - The name of the sentinel master, require when Mode is set to Sentinel.
+
+ Password - The password used to authenticate to the Redis server. See the requirepass property in redis.conf.
+
+ Pool - Max Total - The maximum number of connections that can be allocated by the pool (checked out to clients, or idle awaiting checkout).
+ A negative value indicates that there is no limit.
+
+ Pool - Max Idle - The maximum number of idle connections that can be held in the pool, or a negative value if there is no limit.
+
+ Pool - Min Idle - The target for the minimum number of idle connections to maintain in the pool. If the configured value of Min Idle is
+ greater than the configured value for Max Idle, then the value of Max Idle will be used instead.
+
+ Pool - Block When Exhausted - Whether or not clients should block and wait when trying to obtain a connection from the pool when the pool
+ has no available connections. Setting this to false means an error will occur immediately when a client requests a connection and
+ none are available.
+
+ Pool - Max Wait Time - The amount of time to wait for an available connection when Block When Exhausted is set to true.
+
+ Pool - Min Evictable Idle Time - The minimum amount of time an object may sit idle in the pool before it is eligible for eviction.
+
+ Pool - Time Between Eviction Runs - The amount of time between attempting to evict idle connections from the pool.
+
+ Pool - Num Tests Per Eviction Run - The number of connections to tests per eviction attempt. A negative value indicates to test all connections.
+
+ Pool - Test On Create - Whether or not connections should be tested upon creation (default false).
+
+ Pool - Test On Borrow - Whether or not connections should be tested upon borrowing from the pool (default false).
+
+ Pool - Test On Return - Whether or not connections should be tested upon returning to the pool (default false).
+
+ Pool - Test While Idle - Whether or not connections should be tested while idle (default true).
+
+ <cluster-provider>
+ <id>redis-provider</id>
+ <class>org.apache.nifi.redis.state.RedisStateProvider</class>
+ <property name="Redis Mode">Standalone</property>
+ <property name="Connection String">localhost:6379</property>
+ </cluster-provider>
+ -->
+
</stateManagement>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml
new file mode 100644
index 0000000..e59220d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-redis-bundle</artifactId>
+ <version>1.4.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-redis-extensions</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <!-- Provided deps from nifi-redis-service-api -->
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-distributed-cache-client-service-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-redis-service-api</artifactId>
+ <version>1.4.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.data</groupId>
+ <artifactId>spring-data-redis</artifactId>
+ <version>${spring.data.redis.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
+ <version>2.9.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- End Provided deps from nifi-redis-service-api -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ </dependency>
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.github.kstyrc</groupId>
+ <artifactId>embedded-redis</artifactId>
+ <version>0.6</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java
new file mode 100644
index 0000000..68169f9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.service;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.redis.RedisType;
+import org.apache.nifi.redis.util.RedisUtils;
+import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
+
+import java.util.Collection;
+import java.util.List;
+
+@Tags({"redis", "cache"})
+@CapabilityDescription("A service that provides connections to Redis.")
+public class RedisConnectionPoolService extends AbstractControllerService implements RedisConnectionPool {
+
+ private volatile PropertyContext context;
+ private volatile RedisType redisType;
+ private volatile JedisConnectionFactory connectionFactory;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return RedisUtils.REDIS_CONNECTION_PROPERTY_DESCRIPTORS;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ return RedisUtils.validate(validationContext);
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ this.context = context;
+
+ final String redisMode = context.getProperty(RedisUtils.REDIS_MODE).getValue();
+ this.redisType = RedisType.fromDisplayName(redisMode);
+ }
+
+ @OnDisabled
+ public void onDisabled() {
+ if (connectionFactory != null) {
+ connectionFactory.destroy();
+ connectionFactory = null;
+ redisType = null;
+ context = null;
+ }
+ }
+
+ @Override
+ public RedisType getRedisType() {
+ return redisType;
+ }
+
+ @Override
+ public RedisConnection getConnection() {
+ if (connectionFactory == null) {
+ synchronized (this) {
+ if (connectionFactory == null) {
+ connectionFactory = RedisUtils.createConnectionFactory(context, getLogger());
+ }
+ }
+ }
+
+ return connectionFactory.getConnection();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
new file mode 100644
index 0000000..94b195c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.service;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
+import org.apache.nifi.distributed.cache.client.Deserializer;
+import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.redis.RedisConnectionPool;
+import org.apache.nifi.redis.RedisType;
+import org.apache.nifi.redis.util.RedisAction;
+import org.apache.nifi.util.Tuple;
+import org.springframework.data.redis.connection.RedisConnection;
+import org.springframework.data.redis.core.Cursor;
+import org.springframework.data.redis.core.ScanOptions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+@Tags({ "redis", "distributed", "cache", "map" })
+@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Redis as the backing cache. This service relies on " +
+ "the WATCH, MULTI, and EXEC commands in Redis, which are not fully supported when Redis is clustered. As a result, this service " +
+ "can only be used with a Redis Connection Pool that is configured for standalone or sentinel mode. Sentinel mode can be used to " +
+ "provide high-availability configurations.")
+public class RedisDistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient<byte[]> {
+
+ public static final PropertyDescriptor REDIS_CONNECTION_POOL = new PropertyDescriptor.Builder()
+ .name("redis-connection-pool")
+ .displayName("Redis Connection Pool")
+ .identifiesControllerService(RedisConnectionPool.class)
+ .required(true)
+ .build();
+
+ static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
+ static {
+ final List<PropertyDescriptor> props = new ArrayList<>();
+ props.add(REDIS_CONNECTION_POOL);
+ PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
+ }
+
+ private volatile RedisConnectionPool redisConnectionPool;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>();
+
+ final RedisConnectionPool redisConnectionPool = validationContext.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
+ if (redisConnectionPool != null) {
+ final RedisType redisType = redisConnectionPool.getRedisType();
+ if (redisType != null && redisType == RedisType.CLUSTER) {
+ results.add(new ValidationResult.Builder()
+ .subject(REDIS_CONNECTION_POOL.getDisplayName())
+ .valid(false)
+ .explanation(REDIS_CONNECTION_POOL.getDisplayName()
+ + " is configured in clustered mode, and this service requires a non-clustered Redis")
+ .build());
+ }
+ }
+
+ return results;
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ this.redisConnectionPool = context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
+ }
+
+ @OnDisabled
+ public void onDisabled() {
+ this.redisConnectionPool = null;
+ }
+
+ @Override
+ public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+ return withConnection(redisConnection -> {
+ final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer);
+ return redisConnection.setNX(kv.getKey(), kv.getValue());
+ });
+ }
+
+ @Override
+ public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer) throws IOException {
+ return withConnection(redisConnection -> {
+ final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer);
+ do {
+ // start a watch on the key and retrieve the current value
+ redisConnection.watch(kv.getKey());
+ final byte[] existingValue = redisConnection.get(kv.getKey());
+
+ // start a transaction and perform the put-if-absent
+ redisConnection.multi();
+ redisConnection.setNX(kv.getKey(), kv.getValue());
+
+ // execute the transaction
+ final List<Object> results = redisConnection.exec();
+
+ // if the results list was empty, then the transaction failed (i.e. key was modified after we started watching), so keep looping to retry
+ // if the results list has results, then the transaction succeeded and it should have the result of the setNX operation
+ if (results.size() > 0) {
+ final Object firstResult = results.get(0);
+ if (firstResult instanceof Boolean) {
+ final Boolean absent = (Boolean) firstResult;
+ return absent ? null : valueDeserializer.deserialize(existingValue);
+ } else {
+ // this shouldn't really happen, but just in case there is a non-boolean result then bounce out of the loop
+ throw new IOException("Unexpected result from Redis transaction: Expected Boolean result, but got "
+ + firstResult.getClass().getName() + " with value " + firstResult.toString());
+ }
+ }
+ } while (isEnabled());
+
+ return null;
+ });
+ }
+
+
+ @Override
+ public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
+ return withConnection(redisConnection -> {
+ final byte[] k = serialize(key, keySerializer);
+ return redisConnection.exists(k);
+ });
+ }
+
+ @Override
+ public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+ withConnection(redisConnection -> {
+ final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer);
+ redisConnection.set(kv.getKey(), kv.getValue());
+ return null;
+ });
+ }
+
+ @Override
+ public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
+ return withConnection(redisConnection -> {
+ final byte[] k = serialize(key, keySerializer);
+ final byte[] v = redisConnection.get(k);
+ return valueDeserializer.deserialize(v);
+ });
+ }
+
+ @Override
+ public void close() throws IOException {
+ // nothing to do
+ }
+
+ @Override
+ public <K> boolean remove(final K key, final Serializer<K> keySerializer) throws IOException {
+ return withConnection(redisConnection -> {
+ final byte[] k = serialize(key, keySerializer);
+ final long numRemoved = redisConnection.del(k);
+ return numRemoved > 0;
+ });
+ }
+
+ @Override
+ public long removeByPattern(final String regex) throws IOException {
+ return withConnection(redisConnection -> {
+ long deletedCount = 0;
+ final List<byte[]> batchKeys = new ArrayList<>();
+
+ // delete keys in batches of 1000 using the cursor
+ final Cursor<byte[]> cursor = redisConnection.scan(ScanOptions.scanOptions().count(100).match(regex).build());
+ while (cursor.hasNext()) {
+ batchKeys.add(cursor.next());
+
+ if (batchKeys.size() == 1000) {
+ deletedCount += redisConnection.del(getKeys(batchKeys));
+ batchKeys.clear();
+ }
+ }
+
+ // delete any left-over keys if some were added to the batch but never reached 1000
+ if (batchKeys.size() > 0) {
+ deletedCount += redisConnection.del(getKeys(batchKeys));
+ batchKeys.clear();
+ }
+
+ return deletedCount;
+ });
+ }
+
+ /**
+ * Convert the list of all keys to an array.
+ */
+ private byte[][] getKeys(final List<byte[]> keys) {
+ final byte[][] allKeysArray = new byte[keys.size()][];
+ for (int i=0; i < keys.size(); i++) {
+ allKeysArray[i] = keys.get(i);
+ }
+ return allKeysArray;
+ }
+
+ // ----------------- Methods from AtomicDistributedMapCacheClient ------------------------
+
+ @Override
+ public <K, V> AtomicCacheEntry<K, V, byte[]> fetch(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
+ return withConnection(redisConnection -> {
+ final byte[] k = serialize(key, keySerializer);
+
+ final byte[] v = redisConnection.get(k);
+ if (v == null) {
+ return null;
+ }
+
+ // for Redis we are going to use the raw bytes of the value as the revision
+ return new AtomicCacheEntry<>(key, valueDeserializer.deserialize(v), v);
+ });
+ }
+
+ @Override
+ public <K, V> boolean replace(final AtomicCacheEntry<K, V, byte[]> entry, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+ return withConnection(redisConnection -> {
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ keySerializer.serialize(entry.getKey(), out);
+ final byte[] k = out.toByteArray();
+ out.reset();
+
+ valueSerializer.serialize(entry.getValue(), out);
+ final byte[] newVal = out.toByteArray();
+
+ // the revision of the cache entry holds the value of the key from a previous fetch
+ final byte[] prevVal = entry.getRevision().orElse(null);
+
+ boolean replaced = false;
+
+ // start a watch on the key and retrieve the current value
+ redisConnection.watch(k);
+ final byte[] currValue = redisConnection.get(k);
+
+ // start a transaction
+ redisConnection.multi();
+
+ // compare-and-set
+ if (Arrays.equals(prevVal, currValue)) {
+ // if we use set(k, newVal) then the results list will always have size == 0 b/c when convertPipelineAndTxResults is set to true,
+ // status responses like "OK" are skipped over, so by using getSet we can rely on the results list to know if the transaction succeeded
+ redisConnection.getSet(k, newVal);
+ }
+
+ // execute the transaction
+ final List<Object> results = redisConnection.exec();
+
+ // if we have a result then the replace succeeded
+ if (results.size() > 0) {
+ replaced = true;
+ }
+
+ return replaced;
+ });
+ }
+
+ // ----------------- END Methods from AtomicDistributedMapCacheClient ------------------------
+
+ private <K, V> Tuple<byte[],byte[]> serialize(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ keySerializer.serialize(key, out);
+ final byte[] k = out.toByteArray();
+
+ out.reset();
+
+ valueSerializer.serialize(value, out);
+ final byte[] v = out.toByteArray();
+
+ return new Tuple<>(k, v);
+ }
+
+ private <K> byte[] serialize(final K key, final Serializer<K> keySerializer) throws IOException {
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ keySerializer.serialize(key, out);
+ return out.toByteArray();
+ }
+
+ private <T> T withConnection(final RedisAction<T> action) throws IOException {
+ RedisConnection redisConnection = null;
+ try {
+ redisConnection = redisConnectionPool.getConnection();
+ return action.execute(redisConnection);
+ } finally {
+ if (redisConnection != null) {
+ try {
+ redisConnection.close();
+ } catch (Exception e) {
+ getLogger().warn("Error closing connection: " + e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aabd4a25/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMap.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMap.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMap.java
new file mode 100644
index 0000000..068b7a6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateMap.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.redis.state;
+
+import org.apache.nifi.components.state.StateMap;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * A StateMap implementation for RedisStateProvider.
+ */
+public class RedisStateMap implements StateMap {
+
+ public static final Long DEFAULT_VERSION = new Long(-1);
+ public static final Integer DEFAULT_ENCODING = new Integer(1);
+
+ private final Long version;
+ private final Integer encodingVersion;
+ private final Map<String,String> stateValues;
+
+ private RedisStateMap(final Builder builder) {
+ this.version = builder.version == null ? DEFAULT_VERSION : builder.version;
+ this.encodingVersion = builder.encodingVersion == null ? DEFAULT_ENCODING : builder.encodingVersion;
+ this.stateValues = Collections.unmodifiableMap(new TreeMap<>(builder.stateValues));
+ Objects.requireNonNull(version, "Version must be non-null");
+ Objects.requireNonNull(encodingVersion, "Encoding Version must be non-null");
+ Objects.requireNonNull(stateValues, "State Values must be non-null");
+ }
+
+ @Override
+ public long getVersion() {
+ return version;
+ }
+
+ @Override
+ public String get(String key) {
+ return stateValues.get(key);
+ }
+
+ @Override
+ public Map<String, String> toMap() {
+ return stateValues;
+ }
+
+ public Integer getEncodingVersion() {
+ return encodingVersion;
+ }
+
+ public static class Builder {
+
+ private Long version;
+ private Integer encodingVersion;
+ private Map<String,String> stateValues = new TreeMap<>();
+
+ public Builder version(final Long version) {
+ this.version = version;
+ return this;
+ }
+
+ public Builder encodingVersion(final Integer encodingVersion) {
+ this.encodingVersion = encodingVersion;
+ return this;
+ }
+
+ public Builder stateValue(final String name, String value) {
+ stateValues.put(name, value);
+ return this;
+ }
+
+ public Builder stateValues(final Map<String,String> stateValues) {
+ this.stateValues.clear();
+ if (stateValues != null) {
+ this.stateValues.putAll(stateValues);
+ }
+ return this;
+ }
+
+ public RedisStateMap build() {
+ return new RedisStateMap(this);
+ }
+ }
+
+}