You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/07/26 18:37:52 UTC

nifi git commit: NIFI-2316, NIFI-2318: Ensure that we do not save the flow before initializing the Run Status of components. Clarify the Node Event messages

Repository: nifi
Updated Branches:
  refs/heads/master 69586d8bd -> 52bc23f5d


NIFI-2316, NIFI-2318: Ensure that we do not save the flow before initializing the Run Status of components. Clarify the Node Event messages

This closes #678

Signed-off-by: jpercivall <jo...@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/52bc23f5
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/52bc23f5
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/52bc23f5

Branch: refs/heads/master
Commit: 52bc23f5dbcc3a236df1fa88744c277e5d60e244
Parents: 69586d8
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Jul 19 14:49:38 2016 -0400
Committer: jpercivall <jo...@yahoo.com>
Committed: Tue Jul 26 14:24:32 2016 -0400

----------------------------------------------------------------------
 .../node/NodeClusterCoordinator.java            | 60 +++++++++++++++---
 .../apache/nifi/controller/FlowController.java  | 26 +++-----
 .../nifi/controller/StandardFlowService.java    | 64 ++++++++++----------
 .../controller/StandardFlowSynchronizer.java    |  4 +-
 .../serialization/FlowFromDOMFactory.java       |  8 ++-
 .../StandardXMLFlowConfigurationDAO.java        | 12 +++-
 6 files changed, 113 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/52bc23f5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index b31530f..514d928 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -35,6 +35,7 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -770,6 +771,55 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
         }
     }
 
+    private String summarizeStatusChange(final NodeConnectionStatus oldStatus, final NodeConnectionStatus status) {
+        final StringBuilder sb = new StringBuilder();
+
+        if (oldStatus != null && status.getState() == oldStatus.getState()) {
+            // Check if roles changed
+            final Set<String> oldRoles = oldStatus.getRoles();
+            final Set<String> newRoles = status.getRoles();
+
+            final Set<String> rolesRemoved = new HashSet<>(oldRoles);
+            rolesRemoved.removeAll(newRoles);
+
+            final Set<String> rolesAdded = new HashSet<>(newRoles);
+            rolesAdded.removeAll(oldRoles);
+
+            if (!rolesRemoved.isEmpty()) {
+                sb.append("Relinquished role");
+                if (rolesRemoved.size() != 1) {
+                    sb.append("s");
+                }
+
+                sb.append(" ").append(rolesRemoved);
+            }
+
+            if (!rolesAdded.isEmpty()) {
+                if (sb.length() > 0) {
+                    sb.append("; ");
+                }
+
+                sb.append("Acquired role");
+                if (rolesAdded.size() != 1) {
+                    sb.append("s");
+                }
+
+                sb.append(" ").append(rolesAdded);
+            }
+        } else {
+            sb.append("Node Status changed from ").append(oldStatus == null ? "[Unknown Node]" : oldStatus.getState().toString()).append(" to ").append(status.getState().toString());
+            if (status.getState() == NodeConnectionState.CONNECTED) {
+                sb.append(" (Roles=").append(status.getRoles().toString()).append(")");
+            } else if (status.getDisconnectReason() != null) {
+                sb.append(" due to ").append(status.getDisconnectReason());
+            } else if (status.getDisconnectCode() != null) {
+                sb.append(" due to ").append(status.getDisconnectCode().toString());
+            }
+        }
+
+        return sb.toString();
+    }
+
     private void handleNodeStatusChange(final NodeStatusChangeMessage statusChangeMessage) {
         final NodeConnectionStatus updatedStatus = statusChangeMessage.getNodeConnectionStatus();
         final NodeIdentifier nodeId = statusChangeMessage.getNodeId();
@@ -790,14 +840,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
                     logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
 
                     final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus();
-                    final StringBuilder sb = new StringBuilder();
-                    sb.append("Connection Status changed to ").append(status.getState().toString());
-                    if (status.getDisconnectReason() != null) {
-                        sb.append(" due to ").append(status.getDisconnectReason());
-                    } else if (status.getDisconnectCode() != null) {
-                        sb.append(" due to ").append(status.getDisconnectCode().toString());
+                    final String summary = summarizeStatusChange(oldStatus, status);
+                    if (!StringUtils.isEmpty(summary)) {
+                        addNodeEvent(nodeId, summary);
                     }
-                    addNodeEvent(nodeId, sb.toString());
 
                     // Update our counter so that we are in-sync with the cluster on the
                     // most up-to-date version of the NodeConnectionStatus' Update Identifier.

http://git-wip-us.apache.org/repos/asf/nifi/blob/52bc23f5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 5de77f6..2649895 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -565,7 +565,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             }
         }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
 
-        heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED)));
+        this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
+        heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
 
         if (configuredForClustering) {
             leaderElectionManager = new CuratorLeaderElectionManager(4);
@@ -1459,7 +1460,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             }
 
             // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
+            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary()));
         } finally {
             writeLock.unlock();
         }
@@ -3349,7 +3350,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             }
 
             // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
+            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary()));
         } finally {
             writeLock.unlock();
         }
@@ -3386,7 +3387,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         eventDrivenWorkerQueue.setPrimary(primary);
 
         // update the heartbeat bean
-        final HeartbeatBean oldBean = this.heartbeatBeanRef.getAndSet(new HeartbeatBean(rootGroup, primary, connectionStatus));
+        final HeartbeatBean oldBean = this.heartbeatBeanRef.getAndSet(new HeartbeatBean(rootGroup, primary));
 
         // Emit a bulletin detailing the fact that the primary node state has changed
         if (oldBean == null || oldBean.isPrimary() != primary) {
@@ -3754,7 +3755,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     public boolean isConnected() {
         rwLock.readLock().lock();
         try {
-            return connectionStatus.getState() == NodeConnectionState.CONNECTED;
+            return connectionStatus != null && connectionStatus.getState() == NodeConnectionState.CONNECTED;
         } finally {
             rwLock.readLock().unlock();
         }
@@ -3766,7 +3767,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             this.connectionStatus = connectionStatus;
 
             // update the heartbeat bean
-            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
+            this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary()));
         } finally {
             rwLock.writeLock().unlock();
         }
@@ -3837,8 +3838,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             if (bean == null) {
                 readLock.lock();
                 try {
-                    final NodeConnectionStatus connectionStatus = new NodeConnectionStatus(getNodeId(), DisconnectionCode.NOT_YET_CONNECTED);
-                    bean = new HeartbeatBean(getGroup(getRootGroupId()), isPrimary(), connectionStatus);
+                    bean = new HeartbeatBean(getGroup(getRootGroupId()), isPrimary());
                 } finally {
                     readLock.unlock();
                 }
@@ -3868,7 +3868,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 roles.add(ClusterRoles.CLUSTER_COORDINATOR);
             }
 
-            final Heartbeat heartbeat = new Heartbeat(nodeId, roles, bean.getConnectionStatus(), hbPayload.marshal());
+            final Heartbeat heartbeat = new Heartbeat(nodeId, roles, connectionStatus, hbPayload.marshal());
             final HeartbeatMessage message = new HeartbeatMessage();
             message.setHeartbeat(heartbeat);
 
@@ -4002,12 +4002,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     private static class HeartbeatBean {
         private final ProcessGroup rootGroup;
         private final boolean primary;
-        private final NodeConnectionStatus connectionStatus;
 
-        public HeartbeatBean(final ProcessGroup rootGroup, final boolean primary, final NodeConnectionStatus connectionStatus) {
+        public HeartbeatBean(final ProcessGroup rootGroup, final boolean primary) {
             this.rootGroup = rootGroup;
             this.primary = primary;
-            this.connectionStatus = connectionStatus;
         }
 
         public ProcessGroup getRootGroup() {
@@ -4017,9 +4015,5 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         public boolean isPrimary() {
             return primary;
         }
-
-        public NodeConnectionStatus getConnectionStatus() {
-            return connectionStatus;
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/52bc23f5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 4f286eb..71d66b5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -16,6 +16,37 @@
  */
 package org.apache.nifi.controller;
 
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
 import org.apache.nifi.authorization.Authorizer;
@@ -63,37 +94,6 @@ import org.apache.nifi.web.revision.RevisionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
 public class StandardFlowService implements FlowService, ProtocolHandler {
 
     private static final String EVENT_CATEGORY = "Controller";
@@ -444,7 +444,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
              * the response will be null and we should load the local dataflow
              * and heartbeat until a manager is located.
              */
-            final boolean localFlowEmpty = StandardFlowSynchronizer.isEmpty(proposedFlow, encryptor);
+            final boolean localFlowEmpty = StandardFlowSynchronizer.isEmpty(proposedFlow);
             final ConnectionResponse response = connect(localFlowEmpty, localFlowEmpty);
 
             // obtain write lock while we are updating the controller. We need to ensure that we don't

http://git-wip-us.apache.org/repos/asf/nifi/blob/52bc23f5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index d6dee2c..bc28380 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -124,7 +124,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
         autoResumeState = NiFiProperties.getInstance().getAutoResumeState();
     }
 
-    public static boolean isEmpty(final DataFlow dataFlow, final StringEncryptor encryptor) {
+    public static boolean isEmpty(final DataFlow dataFlow) {
         if (dataFlow == null || dataFlow.getFlow() == null || dataFlow.getFlow().length == 0) {
             return true;
         }
@@ -135,7 +135,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
         final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
         final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(rootGroupElement);
 
-        final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor, encodingVersion);
+        final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, null, encodingVersion);
         return isEmpty(rootGroupDto);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/52bc23f5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
index 1409df4..2c51e96 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
@@ -260,7 +260,9 @@ public class FlowFromDOMFactory {
         dto.setProxyHost(getString(element, "proxyHost"));
         dto.setProxyPort(getOptionalInt(element, "proxyPort"));
         dto.setProxyUser(getString(element, "proxyUser"));
-        String proxyPassword = decrypt(getString(element, "proxyPassword"), encryptor);
+
+        final String rawPassword = getString(element, "proxyPassword");
+        final String proxyPassword = encryptor == null ? rawPassword : decrypt(rawPassword, encryptor);
         dto.setProxyPassword(proxyPassword);
 
         return dto;
@@ -395,7 +397,9 @@ public class FlowFromDOMFactory {
         final List<Element> propertyNodeList = getChildrenByTagName(element, "property");
         for (final Element propertyElement : propertyNodeList) {
             final String name = getString(propertyElement, "name");
-            final String value = decrypt(getString(propertyElement, "value"), encryptor);
+
+            final String rawPropertyValue = getString(propertyElement, "value");
+            final String value = encryptor == null ? rawPropertyValue : decrypt(rawPropertyValue, encryptor);
             properties.put(name, value);
         }
         return properties;

http://git-wip-us.apache.org/repos/asf/nifi/blob/52bc23f5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
index 8b0d18f..f73dce5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
@@ -80,8 +80,16 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
         final FlowSynchronizer flowSynchronizer = new StandardFlowSynchronizer(encryptor);
         controller.synchronize(flowSynchronizer, dataFlow);
 
-        // save based on the controller, not the provided data flow because Process Groups may contain 'local' templates.
-        save(controller);
+        if (StandardFlowSynchronizer.isEmpty(dataFlow)) {
+            // If the dataflow is empty, we want to save it. We do this because when we start up a brand new cluster with no
+            // dataflow, we need to ensure that the flow is consistent across all nodes in the cluster and that upon restart
+            // of NiFi, the root group ID does not change. However, we don't always want to save it, because if the flow is
+            // not empty, then we can get into a bad situation, since the Processors, etc. don't have the appropriate "Scheduled
+            // State" yet (since they haven't yet been scheduled). So if there are components in the flow and we save it, we
+            // may end up saving the flow in such a way that all components are stopped.
+            // We save based on the controller, not the provided data flow because Process Groups may contain 'local' templates.
+            save(controller);
+        }
     }
 
     @Override