You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2021/12/05 11:40:14 UTC
[nifi] branch main updated: NIFI-9442: When deleting a connection, ensure that when the source of the connection is a funnel that its upstream components are checked (recursively)
This is an automated email from the ASF dual-hosted git repository.
jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2a3f7da NIFI-9442: When deleting a connection, ensure that when the source of the connection is a funnel that its upstream components are checked (recursively)
2a3f7da is described below
commit 2a3f7dafd657a8f1a2d638688889710df3ef8acd
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Dec 3 14:14:03 2021 -0500
NIFI-9442: When deleting a connection, ensure that when the source of the connection is a funnel that its upstream components are checked (recursively)
Signed-off-by: Joe Gresock <jg...@gmail.com>
This closes #5568.
---
.../nifi/connectable/StandardConnection.java | 33 ++++++++++++++++++----
1 file changed, 28 insertions(+), 5 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index 1cf78b6..a866c32 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -497,11 +497,16 @@ public final class StandardConnection implements Connection, ConnectionEventList
throw new IllegalStateException("Queue not empty for " + this.getIdentifier());
}
- if (source.isRunning()) {
- if (!ConnectableType.FUNNEL.equals(source.getConnectableType())) {
- throw new IllegalStateException("Source of Connection (" + source.getIdentifier() + ") is running");
- }
- }
+ // The source must be stopped unless it is a Funnel. Funnels cannot be stopped & started. But if the source is a Funnel,
+ // it means that its sources must also be stopped, and the check must go on recursively.
+ // This is important for a case in which we have a cluster where two processors (for example) are connected with a funnel in between.
+ // In this case, if a user deletes the connection between the funnel and its destination, the web request that is made will be done in two
+ // phases: (1) Verify that the request is valid and (2) Delete the connection. But if we don't recursively ensure that the upstream components
+ // are stopped, we could have all nodes in the cluster verify the request is valid in the first phase. But before the second phase occurs, one
+ // node may now have data within the Connection, so the second phase (the delete) will fail. In that situation, the node's dataflow will differ
+ // from the rest of the cluster, and the node will be kicked out of the cluster. To avoid this, we simply ensure that the source is stopped,
+ // and if the source is a funnel (which can't be stopped) that its sources are stopped.
+ verifySourceStoppedOrFunnel(this);
final Connectable dest = destination.get();
if (dest.isRunning()) {
@@ -511,6 +516,24 @@ public final class StandardConnection implements Connection, ConnectionEventList
}
}
+ private void verifySourceStoppedOrFunnel(final Connection connection) {
+ final Connectable sourceComponent = connection.getSource();
+ if (!sourceComponent.isRunning()) {
+ return;
+ }
+
+ final ConnectableType connectableType = sourceComponent.getConnectableType();
+ if (connectableType != ConnectableType.FUNNEL) {
+ // Source is running and not a funnel. Source is not considered stopped.
+ throw new IllegalStateException("Upstream component of Connection (" + sourceComponent + ") is running");
+ }
+
+ // Source is a funnel and is running. We need to then check all of its upstream components.
+ for (final Connection incoming : source.getIncomingConnections()) {
+ verifySourceStoppedOrFunnel(incoming);
+ }
+ }
+
@Override
public Optional<String> getVersionedComponentId() {
return Optional.ofNullable(versionedComponentId.get());