You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2017/05/17 21:36:44 UTC

nifi git commit: NIFI-1963 Allows a node reconnecting to the cluster to inherit non-fingerprinted processor settings Forces a node reconnecting to a cluster to serialize the updated flow to disk Added most processor settings to the flow fingerprint (excl

Repository: nifi
Updated Branches:
  refs/heads/master 289dde098 -> 8e1c79eaa


NIFI-1963 Allows a node reconnecting to the cluster to inherit non-fingerprinted processor settings
Forces a node reconnecting to a cluster to serialize the updated flow to disk
Added most processor settings to the flow fingerprint (excluding name, style, comment, position, and schedule state)
Updated some test data for FingerprintFactoryTest to test for new fields added to the flow fingerprint
Updated StandardProcessorNode to allow processor comments and name to be settable while a processor is running
Updated StandardFlowSynchronizer to inherit non-fingerprinted processor settings (name, style, comment, and position) when flow is already synchronized
This closes #1812


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8e1c79ea
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8e1c79ea
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8e1c79ea

Branch: refs/heads/master
Commit: 8e1c79eaafe886e85e4ceaf8436b961eccfef568
Parents: 289dde0
Author: Jeff Storck <jt...@gmail.com>
Authored: Mon May 15 17:21:11 2017 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Wed May 17 17:35:33 2017 -0400

----------------------------------------------------------------------
 .../nifi/controller/StandardFlowService.java    |  4 ++-
 .../controller/StandardFlowSynchronizer.java    | 14 +++++++---
 .../nifi/controller/StandardProcessorNode.java  |  6 -----
 .../nifi/fingerprint/FingerprintFactory.java    | 19 ++++++++++++++
 .../test/resources/nifi/fingerprint/flow1a.xml  | 17 +++++++++---
 .../test/resources/nifi/fingerprint/flow1b.xml  | 27 +++++++++++++++-----
 .../test/resources/nifi/fingerprint/flow2.xml   | 17 +++++++++---
 7 files changed, 80 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8e1c79ea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 0ce6742..b2c1628 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -643,7 +643,9 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
 
             clusterCoordinator.resetNodeStatuses(connectionResponse.getNodeConnectionStatuses().stream()
                     .collect(Collectors.toMap(status -> status.getNodeIdentifier(), status -> status)));
-            controller.resumeHeartbeats();  // we are now connected, so resume sending heartbeats.
+            // reconnected, this node needs to explicitly write the inherited flow to disk, and resume heartbeats
+            saveFlowChanges();
+            controller.resumeHeartbeats();
 
             logger.info("Node reconnected.");
         } catch (final Exception ex) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e1c79ea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 09338c9..975f954 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -725,6 +725,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
             final ProcessorDTO dto = FlowFromDOMFactory.getProcessor(processorElement, encryptor);
             final ProcessorNode procNode = processGroup.getProcessor(dto.getId());
 
+            updateNonFingerprintedProcessorSettings(procNode, dto);
+
             if (!procNode.getScheduledState().name().equals(dto.getState())) {
                 try {
                     switch (ScheduledState.valueOf(dto.getState())) {
@@ -964,15 +966,12 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
     private void updateProcessor(final ProcessorNode procNode, final ProcessorDTO processorDTO, final ProcessGroup processGroup, final FlowController controller)
             throws ProcessorInstantiationException {
         final ProcessorConfigDTO config = processorDTO.getConfig();
-        procNode.setPosition(toPosition(processorDTO.getPosition()));
-        procNode.setName(processorDTO.getName());
-        procNode.setStyle(processorDTO.getStyle());
         procNode.setProcessGroup(processGroup);
-        procNode.setComments(config.getComments());
         procNode.setLossTolerant(config.isLossTolerant());
         procNode.setPenalizationPeriod(config.getPenaltyDuration());
         procNode.setYieldPeriod(config.getYieldDuration());
         procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel()));
+        updateNonFingerprintedProcessorSettings(procNode, processorDTO);
 
         if (config.getSchedulingStrategy() != null) {
             procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
@@ -1011,6 +1010,13 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
         }
     }
 
+    private void updateNonFingerprintedProcessorSettings(final ProcessorNode procNode, final ProcessorDTO processorDTO) {
+        procNode.setName(processorDTO.getName());
+        procNode.setPosition(toPosition(processorDTO.getPosition()));
+        procNode.setStyle(processorDTO.getStyle());
+        procNode.setComments(processorDTO.getConfig().getComments());
+    }
+
     private ProcessGroup addProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement,
             final StringEncryptor encryptor, final FlowEncodingVersion encodingVersion) throws ProcessorInstantiationException {
         // get the parent group ID

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e1c79ea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index 1a1acc0..6d96a5c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -256,9 +256,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
      */
     @Override
     public synchronized void setComments(final String comments) {
-        if (isRunning()) {
-            throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
-        }
         this.comments.set(CharacterFilterUtils.filterInvalidXmlCharacters(comments));
     }
 
@@ -405,9 +402,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
 
     @Override
     public synchronized void setName(final String name) {
-        if (isRunning()) {
-            throw new IllegalStateException("Cannot modify Processor configuration while the Processor is running");
-        }
         super.setName(name);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e1c79ea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
index d9e048e..1ef3e8b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
@@ -341,6 +341,25 @@ public class FingerprintFactory {
         final BundleDTO bundle = FlowFromDOMFactory.getBundle(DomUtils.getChild(processorElem, "bundle"));
         addBundleFingerprint(builder, bundle);
 
+        // max concurrent tasks
+        appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "maxConcurrentTasks"));
+        // scheduling period
+        appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "schedulingPeriod"));
+        // penalization period
+        appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "penalizationPeriod"));
+        // yield period
+        appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "yieldPeriod"));
+        // bulletin level
+        appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "bulletinLevel"));
+        // loss tolerant
+        appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "lossTolerant"));
+        // scheduling strategy
+        appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "schedulingStrategy"));
+        // execution node
+        appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "executionNode"));
+        // run duration nanos
+        appendFirstValue(builder, DomUtils.getChildNodesByTagName(processorElem, "runDurationNanos"));
+
         // get the temp instance of the Processor so that we know the default property values
         final BundleCoordinate coordinate = getCoordinate(className, bundle);
         final ConfigurableComponent configurableComponent = ExtensionManager.getTempComponent(className, coordinate);

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e1c79ea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1a.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1a.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1a.xml
index beccdcc..2896767 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1a.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1a.xml
@@ -25,13 +25,24 @@
             <id>d89ada5d-35fb-44ff-83f1-4cc00b48b2df</id>
             <name>GenerateFlowFile</name>
             <position x="0.0" y="0.0"/>
-            <style>processor</style>
+            <styles/>
             <comment/>
             <class>org.apache.nifi.processors.standard.GenerateFlowFile</class>
+            <bundle>
+                <group>org.apache.nifi</group>
+                <artifact>nifi-standard-nar</artifact>
+                <version>1.3.0-SNAPSHOT</version>
+            </bundle>
             <maxConcurrentTasks>1</maxConcurrentTasks>
-            <schedulingPeriod>0 s</schedulingPeriod>
+            <schedulingPeriod>0 sec</schedulingPeriod>
+            <penalizationPeriod>30 sec</penalizationPeriod>
+            <yieldPeriod>1 sec</yieldPeriod>
+            <bulletinLevel>WARN</bulletinLevel>
             <lossTolerant>false</lossTolerant>
-            <running>false</running>
+            <scheduledState>RUNNING</scheduledState>
+            <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+            <executionNode>ALL</executionNode>
+            <runDurationNanos>0</runDurationNanos>
             <property>
                 <name>file.size</name>
                 <value>5</value>

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e1c79ea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1b.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1b.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1b.xml
index 19ed079..e8d95db 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1b.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1b.xml
@@ -23,15 +23,28 @@
         <comment/>
         <processor>
             <id>d89ada5d-35fb-44ff-83f1-4cc00b48b2df</id>
-            <name>GenerateFlowFile</name>
-            <position x="0.0" y="0.0"/>
-            <style>processor</style>
-            <comment/>
+            <name>GenerateFlowFile1</name>
+            <position x="0.0" y="1.0"/>
+            <styles>
+                <style name="background-color">#00ff00</style>
+            </styles>
+            <comment>this is a comment</comment>
             <class>org.apache.nifi.processors.standard.GenerateFlowFile</class>
+            <bundle>
+                <group>org.apache.nifi</group>
+                <artifact>nifi-standard-nar</artifact>
+                <version>1.3.0-SNAPSHOT</version>
+            </bundle>
             <maxConcurrentTasks>1</maxConcurrentTasks>
-            <schedulingPeriod>0 s</schedulingPeriod>
+            <schedulingPeriod>0 sec</schedulingPeriod>
+            <penalizationPeriod>30 sec</penalizationPeriod>
+            <yieldPeriod>1 sec</yieldPeriod>
+            <bulletinLevel>WARN</bulletinLevel>
             <lossTolerant>false</lossTolerant>
-            <running>false</running>
+            <scheduledState>DISABLED</scheduledState>
+            <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+            <executionNode>ALL</executionNode>
+            <runDurationNanos>0</runDurationNanos>
             <property>
                 <name>file.size</name>
                 <value>5</value>
@@ -45,7 +58,7 @@
             <style>processor</style>
             <comment/>
             <class>org.apache.nifi.processors.standard.LogAttribute</class>
-            <maxConcurrentTasks>10</maxConcurrentTasks>
+            <maxConcurrentTasks>1</maxConcurrentTasks>
             <schedulingPeriod>0 s</schedulingPeriod>
             <lossTolerant>false</lossTolerant>
             <running>false</running>

http://git-wip-us.apache.org/repos/asf/nifi/blob/8e1c79ea/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow2.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow2.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow2.xml
index 8c0e641..bab1778 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow2.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow2.xml
@@ -25,13 +25,24 @@
             <id>d89ada5d-35fb-44ff-83f1-4cc00b48b2dd</id>
             <name>GenerateFlowFile</name>
             <position x="0.0" y="0.0"/>
-            <style>processor</style>
+            <styles/>
             <comment/>
             <class>org.apache.nifi.processors.standard.GenerateFlowFile</class>
+            <bundle>
+                <group>org.apache.nifi</group>
+                <artifact>nifi-standard-nar</artifact>
+                <version>1.4.0-SNAPSHOT</version>
+            </bundle>
             <maxConcurrentTasks>1</maxConcurrentTasks>
             <schedulingPeriod>0 s</schedulingPeriod>
-            <lossTolerant>false</lossTolerant>
-            <running>false</running>
+            <penalizationPeriod>30 s</penalizationPeriod>
+            <yieldPeriod>1 s</yieldPeriod>
+            <bulletinLevel>ERROR</bulletinLevel>
+            <lossTolerant>true</lossTolerant>
+            <scheduledState>RUNNING</scheduledState>
+            <schedulingStrategy>CRON_DRIVEN</schedulingStrategy>
+            <executionNode>PRIMARY</executionNode>
+            <runDurationNanos>1</runDurationNanos>
             <property>
                 <name>file.size</name>
                 <value>5</value>