You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/07/26 18:37:52 UTC
nifi git commit: NIFI-2316,
NIFI-2318: Ensure that we do not save the flow before initializing
the Run Status of components. Clarify the Node Event messages
Repository: nifi
Updated Branches:
refs/heads/master 69586d8bd -> 52bc23f5d
NIFI-2316, NIFI-2318: Ensure that we do not save the flow before initializing the Run Status of components. Clarify the Node Event messages
This closes #678
Signed-off-by: jpercivall <jo...@yahoo.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/52bc23f5
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/52bc23f5
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/52bc23f5
Branch: refs/heads/master
Commit: 52bc23f5dbcc3a236df1fa88744c277e5d60e244
Parents: 69586d8
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Jul 19 14:49:38 2016 -0400
Committer: jpercivall <jo...@yahoo.com>
Committed: Tue Jul 26 14:24:32 2016 -0400
----------------------------------------------------------------------
.../node/NodeClusterCoordinator.java | 60 +++++++++++++++---
.../apache/nifi/controller/FlowController.java | 26 +++-----
.../nifi/controller/StandardFlowService.java | 64 ++++++++++----------
.../controller/StandardFlowSynchronizer.java | 4 +-
.../serialization/FlowFromDOMFactory.java | 8 ++-
.../StandardXMLFlowConfigurationDAO.java | 12 +++-
6 files changed, 113 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/52bc23f5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index b31530f..514d928 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -35,6 +35,7 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -770,6 +771,55 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
}
}
+ private String summarizeStatusChange(final NodeConnectionStatus oldStatus, final NodeConnectionStatus status) {
+ final StringBuilder sb = new StringBuilder();
+
+ if (oldStatus != null && status.getState() == oldStatus.getState()) {
+ // Check if roles changed
+ final Set<String> oldRoles = oldStatus.getRoles();
+ final Set<String> newRoles = status.getRoles();
+
+ final Set<String> rolesRemoved = new HashSet<>(oldRoles);
+ rolesRemoved.removeAll(newRoles);
+
+ final Set<String> rolesAdded = new HashSet<>(newRoles);
+ rolesAdded.removeAll(oldRoles);
+
+ if (!rolesRemoved.isEmpty()) {
+ sb.append("Relinquished role");
+ if (rolesRemoved.size() != 1) {
+ sb.append("s");
+ }
+
+ sb.append(" ").append(rolesRemoved);
+ }
+
+ if (!rolesAdded.isEmpty()) {
+ if (sb.length() > 0) {
+ sb.append("; ");
+ }
+
+ sb.append("Acquired role");
+ if (rolesAdded.size() != 1) {
+ sb.append("s");
+ }
+
+ sb.append(" ").append(rolesAdded);
+ }
+ } else {
+ sb.append("Node Status changed from ").append(oldStatus == null ? "[Unknown Node]" : oldStatus.getState().toString()).append(" to ").append(status.getState().toString());
+ if (status.getState() == NodeConnectionState.CONNECTED) {
+ sb.append(" (Roles=").append(status.getRoles().toString()).append(")");
+ } else if (status.getDisconnectReason() != null) {
+ sb.append(" due to ").append(status.getDisconnectReason());
+ } else if (status.getDisconnectCode() != null) {
+ sb.append(" due to ").append(status.getDisconnectCode().toString());
+ }
+ }
+
+ return sb.toString();
+ }
+
private void handleNodeStatusChange(final NodeStatusChangeMessage statusChangeMessage) {
final NodeConnectionStatus updatedStatus = statusChangeMessage.getNodeConnectionStatus();
final NodeIdentifier nodeId = statusChangeMessage.getNodeId();
@@ -790,14 +840,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
logger.info("Status of {} changed from {} to {}", statusChangeMessage.getNodeId(), oldStatus, updatedStatus);
final NodeConnectionStatus status = statusChangeMessage.getNodeConnectionStatus();
- final StringBuilder sb = new StringBuilder();
- sb.append("Connection Status changed to ").append(status.getState().toString());
- if (status.getDisconnectReason() != null) {
- sb.append(" due to ").append(status.getDisconnectReason());
- } else if (status.getDisconnectCode() != null) {
- sb.append(" due to ").append(status.getDisconnectCode().toString());
+ final String summary = summarizeStatusChange(oldStatus, status);
+ if (!StringUtils.isEmpty(summary)) {
+ addNodeEvent(nodeId, summary);
}
- addNodeEvent(nodeId, sb.toString());
// Update our counter so that we are in-sync with the cluster on the
// most up-to-date version of the NodeConnectionStatus' Update Identifier.
http://git-wip-us.apache.org/repos/asf/nifi/blob/52bc23f5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 5de77f6..2649895 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -565,7 +565,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
- heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false, new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED)));
+ this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
+ heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
if (configuredForClustering) {
leaderElectionManager = new CuratorLeaderElectionManager(4);
@@ -1459,7 +1460,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
// update the heartbeat bean
- this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
+ this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary()));
} finally {
writeLock.unlock();
}
@@ -3349,7 +3350,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
// update the heartbeat bean
- this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
+ this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary()));
} finally {
writeLock.unlock();
}
@@ -3386,7 +3387,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
eventDrivenWorkerQueue.setPrimary(primary);
// update the heartbeat bean
- final HeartbeatBean oldBean = this.heartbeatBeanRef.getAndSet(new HeartbeatBean(rootGroup, primary, connectionStatus));
+ final HeartbeatBean oldBean = this.heartbeatBeanRef.getAndSet(new HeartbeatBean(rootGroup, primary));
// Emit a bulletin detailing the fact that the primary node state has changed
if (oldBean == null || oldBean.isPrimary() != primary) {
@@ -3754,7 +3755,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public boolean isConnected() {
rwLock.readLock().lock();
try {
- return connectionStatus.getState() == NodeConnectionState.CONNECTED;
+ return connectionStatus != null && connectionStatus.getState() == NodeConnectionState.CONNECTED;
} finally {
rwLock.readLock().unlock();
}
@@ -3766,7 +3767,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
this.connectionStatus = connectionStatus;
// update the heartbeat bean
- this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary(), connectionStatus));
+ this.heartbeatBeanRef.set(new HeartbeatBean(rootGroup, isPrimary()));
} finally {
rwLock.writeLock().unlock();
}
@@ -3837,8 +3838,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
if (bean == null) {
readLock.lock();
try {
- final NodeConnectionStatus connectionStatus = new NodeConnectionStatus(getNodeId(), DisconnectionCode.NOT_YET_CONNECTED);
- bean = new HeartbeatBean(getGroup(getRootGroupId()), isPrimary(), connectionStatus);
+ bean = new HeartbeatBean(getGroup(getRootGroupId()), isPrimary());
} finally {
readLock.unlock();
}
@@ -3868,7 +3868,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
roles.add(ClusterRoles.CLUSTER_COORDINATOR);
}
- final Heartbeat heartbeat = new Heartbeat(nodeId, roles, bean.getConnectionStatus(), hbPayload.marshal());
+ final Heartbeat heartbeat = new Heartbeat(nodeId, roles, connectionStatus, hbPayload.marshal());
final HeartbeatMessage message = new HeartbeatMessage();
message.setHeartbeat(heartbeat);
@@ -4002,12 +4002,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private static class HeartbeatBean {
private final ProcessGroup rootGroup;
private final boolean primary;
- private final NodeConnectionStatus connectionStatus;
- public HeartbeatBean(final ProcessGroup rootGroup, final boolean primary, final NodeConnectionStatus connectionStatus) {
+ public HeartbeatBean(final ProcessGroup rootGroup, final boolean primary) {
this.rootGroup = rootGroup;
this.primary = primary;
- this.connectionStatus = connectionStatus;
}
public ProcessGroup getRootGroup() {
@@ -4017,9 +4015,5 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public boolean isPrimary() {
return primary;
}
-
- public NodeConnectionStatus getConnectionStatus() {
- return connectionStatus;
- }
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/52bc23f5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 4f286eb..71d66b5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -16,6 +16,37 @@
*/
package org.apache.nifi.controller;
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AbstractPolicyBasedAuthorizer;
import org.apache.nifi.authorization.Authorizer;
@@ -63,37 +94,6 @@ import org.apache.nifi.web.revision.RevisionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
public class StandardFlowService implements FlowService, ProtocolHandler {
private static final String EVENT_CATEGORY = "Controller";
@@ -444,7 +444,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
* the response will be null and we should load the local dataflow
* and heartbeat until a manager is located.
*/
- final boolean localFlowEmpty = StandardFlowSynchronizer.isEmpty(proposedFlow, encryptor);
+ final boolean localFlowEmpty = StandardFlowSynchronizer.isEmpty(proposedFlow);
final ConnectionResponse response = connect(localFlowEmpty, localFlowEmpty);
// obtain write lock while we are updating the controller. We need to ensure that we don't
http://git-wip-us.apache.org/repos/asf/nifi/blob/52bc23f5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index d6dee2c..bc28380 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -124,7 +124,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
autoResumeState = NiFiProperties.getInstance().getAutoResumeState();
}
- public static boolean isEmpty(final DataFlow dataFlow, final StringEncryptor encryptor) {
+ public static boolean isEmpty(final DataFlow dataFlow) {
if (dataFlow == null || dataFlow.getFlow() == null || dataFlow.getFlow().length == 0) {
return true;
}
@@ -135,7 +135,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(rootGroupElement);
- final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor, encodingVersion);
+ final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, null, encodingVersion);
return isEmpty(rootGroupDto);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/52bc23f5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
index 1409df4..2c51e96 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java
@@ -260,7 +260,9 @@ public class FlowFromDOMFactory {
dto.setProxyHost(getString(element, "proxyHost"));
dto.setProxyPort(getOptionalInt(element, "proxyPort"));
dto.setProxyUser(getString(element, "proxyUser"));
- String proxyPassword = decrypt(getString(element, "proxyPassword"), encryptor);
+
+ final String rawPassword = getString(element, "proxyPassword");
+ final String proxyPassword = encryptor == null ? rawPassword : decrypt(rawPassword, encryptor);
dto.setProxyPassword(proxyPassword);
return dto;
@@ -395,7 +397,9 @@ public class FlowFromDOMFactory {
final List<Element> propertyNodeList = getChildrenByTagName(element, "property");
for (final Element propertyElement : propertyNodeList) {
final String name = getString(propertyElement, "name");
- final String value = decrypt(getString(propertyElement, "value"), encryptor);
+
+ final String rawPropertyValue = getString(propertyElement, "value");
+ final String value = encryptor == null ? rawPropertyValue : decrypt(rawPropertyValue, encryptor);
properties.put(name, value);
}
return properties;
http://git-wip-us.apache.org/repos/asf/nifi/blob/52bc23f5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
index 8b0d18f..f73dce5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardXMLFlowConfigurationDAO.java
@@ -80,8 +80,16 @@ public final class StandardXMLFlowConfigurationDAO implements FlowConfigurationD
final FlowSynchronizer flowSynchronizer = new StandardFlowSynchronizer(encryptor);
controller.synchronize(flowSynchronizer, dataFlow);
- // save based on the controller, not the provided data flow because Process Groups may contain 'local' templates.
- save(controller);
+ if (StandardFlowSynchronizer.isEmpty(dataFlow)) {
+ // If the dataflow is empty, we want to save it. We do this because when we start up a brand new cluster with no
+ // dataflow, we need to ensure that the flow is consistent across all nodes in the cluster and that upon restart
+ // of NiFi, the root group ID does not change. However, we don't always want to save it, because if the flow is
+ // not empty, then we can get into a bad situation, since the Processors, etc. don't have the appropriate "Scheduled
+ // State" yet (since they haven't yet been scheduled). So if there are components in the flow and we save it, we
+ // may end up saving the flow in such a way that all components are stopped.
+ // We save based on the controller, not the provided data flow because Process Groups may contain 'local' templates.
+ save(controller);
+ }
}
@Override