You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/02/03 22:36:39 UTC

[06/12] nifi git commit: NIFI-259: Extending the StateProvider interface to provide a getSupportedScopes method and implemented this based on the capabilities of each of its implementations. Used supported scope to evaluated configurations at startup an

NIFI-259: Extending the StateProvider interface to provide a getSupportedScopes method and implemented this based on the capabilities of each of its implementations.  Used supported scope to evaluated configurations at startup and prevent issues when trying to make use of state


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/447e4019
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/447e4019
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/447e4019

Branch: refs/heads/NIFI-259
Commit: 447e401912100ab9ed73d442e1e8c0c0bb229725
Parents: 257eca9
Author: Aldrin Piri <al...@apache.org>
Authored: Mon Feb 1 15:26:05 2016 -0500
Committer: Aldrin Piri <al...@apache.org>
Committed: Mon Feb 1 16:29:12 2016 -0500

----------------------------------------------------------------------
 .../apache/nifi/components/state/StateProvider.java    |  6 ++++++
 .../state/manager/StandardStateManagerProvider.java    |  7 +++++++
 .../providers/local/WriteAheadLocalStateProvider.java  |  8 +++++++-
 .../providers/zookeeper/ZooKeeperStateProvider.java    | 13 ++++++++++++-
 4 files changed, 32 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/447e4019/nifi-api/src/main/java/org/apache/nifi/components/state/StateProvider.java
----------------------------------------------------------------------
diff --git a/nifi-api/src/main/java/org/apache/nifi/components/state/StateProvider.java b/nifi-api/src/main/java/org/apache/nifi/components/state/StateProvider.java
index e0243f3..e1e4352 100644
--- a/nifi-api/src/main/java/org/apache/nifi/components/state/StateProvider.java
+++ b/nifi-api/src/main/java/org/apache/nifi/components/state/StateProvider.java
@@ -124,4 +124,10 @@ public interface StateProvider extends ConfigurableComponent {
      * @return <code>true</code> if the provider is enabled, <code>false</code> otherwise.
      */
     boolean isEnabled();
+
+    /**
+     * Provides a listing of {@link Scope}s supported by the StateProvider
+     * @return the {@link Scope}s supported by the configuration
+     */
+    Scope[] getSupportedScopes();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/447e4019/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
index b7671f6..ef46f55 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMap;
 
 import javax.net.ssl.SSLContext;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
@@ -159,6 +160,12 @@ public class StandardStateManagerProvider implements StateManagerProvider {
             throw new RuntimeException("Cannot create " + providerDescription + " of type " + providerClassName, e);
         }
 
+        if (!ArrayUtils.contains(provider.getSupportedScopes(), scope)) {
+            throw new RuntimeException("Cannot use " + providerDescription + " ("+providerClassName+") as it only supports scope(s) " + ArrayUtils.toString(provider.getSupportedScopes()) + " but " +
+                "instance"
+                + " is configured to use scope " + scope);
+        }
+
         final Map<PropertyDescriptor, PropertyValue> propertyMap = new HashMap<>();
         final Map<PropertyDescriptor, String> propertyStringMap = new HashMap<>();
         for (final PropertyDescriptor descriptor : provider.getPropertyDescriptors()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/447e4019/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
index 7d16a10..da46ed0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.components.state.StateProviderInitializationContext;
 import org.apache.nifi.controller.state.StandardStateMap;
@@ -48,7 +49,7 @@ import org.wali.UpdateType;
 import org.wali.WriteAheadRepository;
 
 /**
- * Provides state management for local (node-only) state, backed by a write-ahead log
+ * Provides state management for local (standalone) state, backed by a write-ahead log
  */
 public class WriteAheadLocalStateProvider extends AbstractStateProvider {
     private static final Logger logger = LoggerFactory.getLogger(WriteAheadLocalStateProvider.class);
@@ -180,6 +181,11 @@ public class WriteAheadLocalStateProvider extends AbstractStateProvider {
         componentProviders.remove(componentId);
     }
 
+    @Override
+    public Scope[] getSupportedScopes() {
+        return new Scope[]{Scope.LOCAL};
+    }
+
     private static class ComponentProvider {
         private final AtomicLong versionGenerator;
         private final WriteAheadRepository<StateMapUpdate> wal;

http://git-wip-us.apache.org/repos/asf/nifi/blob/447e4019/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
index 8ce74b4..5c8b4c4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
@@ -37,6 +37,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
+import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.components.state.StateProviderInitializationContext;
 import org.apache.nifi.controller.state.StandardStateMap;
@@ -54,6 +55,11 @@ import org.apache.zookeeper.client.ConnectStringParser;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 
+/**
+ * ZooKeeperStateProvider utilizes a ZooKeeper based store, whether provided internally via configuration and enabling of the {@link org.apache.nifi.controller.state.server.ZooKeeperStateServer}
+ * or through an externally configured location.  This implementation caters to a clustered NiFi environment and accordingly only provides {@link Scope#CLUSTER} scoping to enforce
+ * consistency across configuration interactions.
+ */
 public class ZooKeeperStateProvider extends AbstractStateProvider {
     static final AllowableValue OPEN_TO_WORLD = new AllowableValue("Open", "Open", "ZNodes will be open to any ZooKeeper client.");
     static final AllowableValue CREATOR_ONLY = new AllowableValue("CreatorOnly", "CreatorOnly",
@@ -228,7 +234,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
 
     private void verifyEnabled() throws IOException {
         if (!isEnabled()) {
-            throw new IOException("Cannot update or retrieve cluster state because node is no longer connected to a cluster");
+            throw new IOException("Cannot update or retrieve cluster state because node is no longer connected to a cluster.");
         }
     }
 
@@ -255,6 +261,11 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
     }
 
     @Override
+    public Scope[] getSupportedScopes() {
+        return new Scope[]{Scope.CLUSTER};
+    }
+
+    @Override
     public void setState(final Map<String, String> state, final String componentId) throws IOException {
         setState(state, -1, componentId);
     }