You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2022/03/14 19:45:14 UTC

[nifi] branch main updated: NIFI-9794: If a node is OFFLOADING, do not allow connections to be deleted. Also if we fail to notify the node that it needs to offload its data, change its state back to DISCONNECTED. (#5865)

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 21922af  NIFI-9794: If a node is OFFLOADING, do not allow connections to be deleted. Also if we fail to notify the node that it needs to offload its data, change its state back to DISCONNECTED. (#5865)
21922af is described below

commit 21922af90cf5b9b31995d1489d1c80d803dd6842
Author: markap14 <ma...@hotmail.com>
AuthorDate: Mon Mar 14 15:45:02 2022 -0400

    NIFI-9794: If a node is OFFLOADING, do not allow connections to be deleted. Also if we fail to notify the node that it needs to offload its data, change its state back to DISCONNECTED. (#5865)
---
 .../replication/ThreadPoolRequestReplicator.java   | 29 ++++++++++++++++++----
 .../coordination/node/NodeClusterCoordinator.java  |  5 ++++
 2 files changed, 29 insertions(+), 5 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index 343bc56..fe39c8a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -24,12 +24,14 @@ import org.apache.nifi.authorization.user.NiFiUserUtils;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
 import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
+import org.apache.nifi.cluster.coordination.http.endpoints.ConnectionEndpointMerger;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.manager.NodeResponse;
 import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
 import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
 import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
+import org.apache.nifi.cluster.manager.exception.OffloadedNodeMutableRequestException;
 import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.exception.UriConstructionException;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
@@ -164,9 +166,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
     @Override
     public AsyncClusterResponse replicate(NiFiUser user, String method, URI uri, Object entity, Map<String, String> headers) {
         final Map<NodeConnectionState, List<NodeIdentifier>> stateMap = clusterCoordinator.getConnectionStates();
-        final boolean mutable = isMutableRequest(method, uri.getPath());
+        final boolean mutable = isMutableRequest(method);
 
-        // If the request is mutable, ensure that all nodes are connected.
+        // If the request is mutable, ensure the appropriate state: there can be no Connecting Nodes (in order to avoid confusion where a node gets the dataflow, and then gets modified before the
+        // node fully loads the dataflow), and we cannot delete a connection while a node is OFFLOADING (otherwise, we could delete a connection while a node is trying to push data to it).
         if (mutable) {
             final List<NodeIdentifier> connecting = stateMap.get(NodeConnectionState.CONNECTING);
             if (connecting != null && !connecting.isEmpty()) {
@@ -176,6 +179,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
                     throw new ConnectingNodeMutableRequestException(connecting.size() + " Nodes are currently connecting");
                 }
             }
+
+            if (isDeleteConnection(method, uri.getPath())) {
+                final List<NodeIdentifier> offloading = stateMap.get(NodeConnectionState.OFFLOADING);
+                if (offloading != null && !offloading.isEmpty()) {
+                    throw new OffloadedNodeMutableRequestException("Cannot delete conection because the following Nodes are currently being offloaded: " + offloading);
+                }
+            }
         }
 
         final List<NodeIdentifier> nodeIds = stateMap.get(NodeConnectionState.CONNECTED);
@@ -243,7 +253,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
             // performing an action, rather than simply proxying the request to the cluster coordinator. In this case,
             // we need to ensure that we use proper locking. We don't want two requests modifying the flow at the same
             // time, so we use a write lock if the request is mutable and a read lock otherwise.
-            final Lock lock = isMutableRequest(method, uri.getPath()) ? writeLock : readLock;
+            final Lock lock = isMutableRequest(method) ? writeLock : readLock;
             logger.debug("Obtaining lock {} in order to replicate request {} {}", lock, method, uri);
             lock.lock();
             try {
@@ -394,7 +404,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
             // issue the request. This is all handled by calling performVerification, which will replicate
             // the 'vote' request to all nodes and then if successful will call back into this method to
             // replicate the actual request.
-            final boolean mutableRequest = isMutableRequest(method, uri.getPath());
+            final boolean mutableRequest = isMutableRequest(method);
             if (mutableRequest && performVerification) {
                 logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", requestId);
                 performVerification(nodeIds, method, uri, entity, updatedHeaders, response, merge, monitor);
@@ -617,7 +627,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
         return nodeResponse;
     }
 
-    private boolean isMutableRequest(final String method, final String uriPath) {
+    private boolean isMutableRequest(final String method) {
         switch (method.toUpperCase()) {
             case HttpMethod.GET:
             case HttpMethod.HEAD:
@@ -628,6 +638,15 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
         }
     }
 
+    private boolean isDeleteConnection(final String method, final String uriPath) {
+        if (!HttpMethod.DELETE.equalsIgnoreCase(method)) {
+            return false;
+        }
+
+        final boolean isConnectionUri = ConnectionEndpointMerger.CONNECTION_URI_PATTERN.matcher(uriPath).matches();
+        return isConnectionUri;
+    }
+
     /**
      * Verifies that the cluster is in a state that will allow requests to be made using the given HTTP Method and URI path
      *
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 5326073..c0034ce 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -950,6 +950,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
                     try {
                         senderListener.offload(request);
                         reportEvent(nodeId, Severity.INFO, "Node was offloaded due to " + request.getExplanation());
+
                         future.complete(null);
                         return;
                     } catch (final Exception e) {
@@ -965,6 +966,10 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
                     }
                 }
 
+                updateNodeStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.DISCONNECTED, null,
+                    "Attempted to offload node but failed to notify node that it was to offload its data. State reset to disconnected."));
+                addNodeEvent(nodeId, "Failed to initiate node offload: " + lastException);
+
                 future.completeExceptionally(lastException);
             }
         }, "Offload " + request.getNodeId());