You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:36 UTC

[13/51] [partial] incubator-nifi git commit: Initial code contribution

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/DisconnectedNodeMutableRequestException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/DisconnectedNodeMutableRequestException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/DisconnectedNodeMutableRequestException.java
new file mode 100644
index 0000000..412a555
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/DisconnectedNodeMutableRequestException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a HTTP request that may change a node's
+ * dataflow is to be replicated while one or more nodes are disconnected.
+ *
+ * @author unattributed
+ */
+public class DisconnectedNodeMutableRequestException extends MutableRequestException {
+
+    public DisconnectedNodeMutableRequestException() {
+    }
+
+    public DisconnectedNodeMutableRequestException(String msg) {
+        super(msg);
+    }
+
+    public DisconnectedNodeMutableRequestException(Throwable cause) {
+        super(cause);
+    }
+
+    public DisconnectedNodeMutableRequestException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java
new file mode 100644
index 0000000..6c4e670
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Signals that an operation to be performed on a cluster has been invoked at an
+ * illegal or inappropriate time.
+ *
+ * @author unattributed
+ */
+public class IllegalClusterStateException extends ClusterException {
+
+    public IllegalClusterStateException() {
+    }
+
+    public IllegalClusterStateException(String msg) {
+        super(msg);
+    }
+
+    public IllegalClusterStateException(Throwable cause) {
+        super(cause);
+    }
+
+    public IllegalClusterStateException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDeletionException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDeletionException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDeletionException.java
new file mode 100644
index 0000000..adef62a
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDeletionException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a deletion request is issued to a node
+ * that cannot be deleted (e.g., the node is not disconnected).
+ *
+ * @author unattributed
+ */
+public class IllegalNodeDeletionException extends IllegalClusterStateException {
+
+    public IllegalNodeDeletionException() {
+    }
+
+    public IllegalNodeDeletionException(String msg) {
+        super(msg);
+    }
+
+    public IllegalNodeDeletionException(Throwable cause) {
+        super(cause);
+    }
+
+    public IllegalNodeDeletionException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java
new file mode 100644
index 0000000..7e61b24
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a disconnection request is issued to a
+ * node that cannot be disconnected (e.g., last node in cluster, node is primary
+ * node).
+ *
+ * @author unattributed
+ */
+public class IllegalNodeDisconnectionException extends IllegalClusterStateException {
+
+    public IllegalNodeDisconnectionException() {
+    }
+
+    public IllegalNodeDisconnectionException(String msg) {
+        super(msg);
+    }
+
+    public IllegalNodeDisconnectionException(Throwable cause) {
+        super(cause);
+    }
+
+    public IllegalNodeDisconnectionException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeReconnectionException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeReconnectionException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeReconnectionException.java
new file mode 100644
index 0000000..96c76bc
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeReconnectionException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a reconnection request is issued to a
+ * node that cannot be reconnected (e.g., the node is not disconnected).
+ *
+ * @author unattributed
+ */
+public class IllegalNodeReconnectionException extends IllegalClusterStateException {
+
+    public IllegalNodeReconnectionException() {
+    }
+
+    public IllegalNodeReconnectionException(String msg) {
+        super(msg);
+    }
+
+    public IllegalNodeReconnectionException(Throwable cause) {
+        super(cause);
+    }
+
+    public IllegalNodeReconnectionException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IneligiblePrimaryNodeException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IneligiblePrimaryNodeException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IneligiblePrimaryNodeException.java
new file mode 100644
index 0000000..4b0097a
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IneligiblePrimaryNodeException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when the primary role cannot be assigned to a
+ * node because the node is ineligible for the role.
+ *
+ * @author unattributed
+ */
+public class IneligiblePrimaryNodeException extends IllegalClusterStateException {
+
+    public IneligiblePrimaryNodeException() {
+    }
+
+    public IneligiblePrimaryNodeException(String msg) {
+        super(msg);
+    }
+
+    public IneligiblePrimaryNodeException(Throwable cause) {
+        super(cause);
+    }
+
+    public IneligiblePrimaryNodeException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/MutableRequestException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/MutableRequestException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/MutableRequestException.java
new file mode 100644
index 0000000..d160587
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/MutableRequestException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a HTTP request that may change a node's
+ * state is to be replicated while the cluster or connected nodes are unable to
+ * change their state (e.g., a new node is connecting to the cluster).
+ *
+ * @author unattributed
+ */
+public class MutableRequestException extends IllegalClusterStateException {
+
+    public MutableRequestException() {
+    }
+
+    public MutableRequestException(String msg) {
+        super(msg);
+    }
+
+    public MutableRequestException(Throwable cause) {
+        super(cause);
+    }
+
+    public MutableRequestException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java
new file mode 100644
index 0000000..8d704b9
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when the cluster is unable to service a
+ * request because no nodes are connected.
+ *
+ * @author unattributed
+ */
+public class NoConnectedNodesException extends ClusterException {
+
+    public NoConnectedNodesException() {
+    }
+
+    public NoConnectedNodesException(String msg) {
+        super(msg);
+    }
+
+    public NoConnectedNodesException(Throwable cause) {
+        super(cause);
+    }
+
+    public NoConnectedNodesException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java
new file mode 100644
index 0000000..9e17a23
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when the cluster is unable to service a
+ * request because no nodes returned a response. When the given request is not
+ * mutable the nodes are left in their previous state.
+ *
+ * @author unattributed
+ */
+public class NoResponseFromNodesException extends ClusterException {
+
+    public NoResponseFromNodesException() {
+    }
+
+    public NoResponseFromNodesException(String msg) {
+        super(msg);
+    }
+
+    public NoResponseFromNodesException(Throwable cause) {
+        super(cause);
+    }
+
+    public NoResponseFromNodesException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java
new file mode 100644
index 0000000..3bd2f4b
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a disconnection request to a node
+ * failed.
+ *
+ * @author unattributed
+ */
+public class NodeDisconnectionException extends ClusterException {
+
+    public NodeDisconnectionException() {
+    }
+
+    public NodeDisconnectionException(String msg) {
+        super(msg);
+    }
+
+    public NodeDisconnectionException(Throwable cause) {
+        super(cause);
+    }
+
+    public NodeDisconnectionException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java
new file mode 100644
index 0000000..8c40cef
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a reconnection request to a node failed.
+ *
+ * @author unattributed
+ */
+public class NodeReconnectionException extends ClusterException {
+
+    public NodeReconnectionException() {
+    }
+
+    public NodeReconnectionException(String msg) {
+        super(msg);
+    }
+
+    public NodeReconnectionException(Throwable cause) {
+        super(cause);
+    }
+
+    public NodeReconnectionException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/PrimaryRoleAssignmentException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/PrimaryRoleAssignmentException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/PrimaryRoleAssignmentException.java
new file mode 100644
index 0000000..403f7a5
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/PrimaryRoleAssignmentException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when the cluster is unable to update the
+ * primary role of a node.
+ *
+ * @author unattributed
+ */
+public class PrimaryRoleAssignmentException extends IllegalClusterStateException {
+
+    public PrimaryRoleAssignmentException() {
+    }
+
+    public PrimaryRoleAssignmentException(String msg) {
+        super(msg);
+    }
+
+    public PrimaryRoleAssignmentException(Throwable cause) {
+        super(cause);
+    }
+
+    public PrimaryRoleAssignmentException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/SafeModeMutableRequestException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/SafeModeMutableRequestException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/SafeModeMutableRequestException.java
new file mode 100644
index 0000000..f544f26
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/SafeModeMutableRequestException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a HTTP request that may change a node's
+ * dataflow is to be replicated while the cluster is in safe mode.
+ *
+ * @author unattributed
+ */
+public class SafeModeMutableRequestException extends MutableRequestException {
+
+    public SafeModeMutableRequestException() {
+    }
+
+    public SafeModeMutableRequestException(String msg) {
+        super(msg);
+    }
+
+    public SafeModeMutableRequestException(Throwable cause) {
+        super(cause);
+    }
+
+    public SafeModeMutableRequestException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java
new file mode 100644
index 0000000..914bb56
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a request is made for a node that does
+ * not exist.
+ *
+ * @author unattributed
+ */
+public class UnknownNodeException extends ClusterException {
+
+    public UnknownNodeException() {
+    }
+
+    public UnknownNodeException(String msg) {
+        super(msg);
+    }
+
+    public UnknownNodeException(Throwable cause) {
+        super(cause);
+    }
+
+    public UnknownNodeException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UriConstructionException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UriConstructionException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UriConstructionException.java
new file mode 100644
index 0000000..773d7b5
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UriConstructionException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a URI cannot be constructed from the
+ * given information. This exception is similar to Java's URISyntaxException
+ * except that it extends RuntimeException.
+ *
+ * @author unattributed
+ */
+public class UriConstructionException extends RuntimeException {
+
+    public UriConstructionException() {
+    }
+
+    public UriConstructionException(String msg) {
+        super(msg);
+    }
+
+    public UriConstructionException(Throwable cause) {
+        super(cause);
+    }
+
+    public UriConstructionException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
new file mode 100644
index 0000000..2015530
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QuerySubmission;
+import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.reporting.EventAccess;
+
+public class ClusteredEventAccess implements EventAccess {
+
+    private final WebClusterManager clusterManager;
+
+    public ClusteredEventAccess(final WebClusterManager clusterManager) {
+        this.clusterManager = clusterManager;
+    }
+
+    @Override
+    public ProcessGroupStatus getControllerStatus() {
+        return clusterManager.getProcessGroupStatus(WebClusterManager.ROOT_GROUP_ID_ALIAS);
+    }
+
+    @Override
+    public List<ProvenanceEventRecord> getProvenanceEvents(long arg0, int arg1) throws IOException {
+        return new ArrayList<>();
+    }
+
+    @Override
+    public ProvenanceEventRepository getProvenanceRepository() {
+        // NCM doesn't have provenance events, because it doesn't process FlowFiles.
+        // So we just use a Provenance Event Repository that does nothing.
+        return new ProvenanceEventRepository() {
+            @Override
+            public void close() throws IOException {
+            }
+
+            @Override
+            public ProvenanceEventRecord getEvent(long eventId) throws IOException {
+                return null;
+            }
+
+            @Override
+            public List<ProvenanceEventRecord> getEvents(long startEventId, int maxEvents) throws IOException {
+                return new ArrayList<>();
+            }
+
+            @Override
+            public Long getMaxEventId() {
+                return null;
+            }
+
+            @Override
+            public List<SearchableField> getSearchableAttributes() {
+                return new ArrayList<>();
+            }
+
+            @Override
+            public List<SearchableField> getSearchableFields() {
+                return new ArrayList<>();
+            }
+
+            @Override
+            public void registerEvent(final ProvenanceEventRecord event) {
+            }
+
+            @Override
+            public void registerEvents(final Iterable<ProvenanceEventRecord> events) {
+            }
+
+            @Override
+            public ComputeLineageSubmission retrieveLineageSubmission(final String submissionId) {
+                return null;
+            }
+
+            @Override
+            public QuerySubmission retrieveQuerySubmission(final String submissionId) {
+                return null;
+            }
+
+            @Override
+            public ComputeLineageSubmission submitExpandChildren(final long eventId) {
+                return null;
+            }
+
+            @Override
+            public ComputeLineageSubmission submitExpandParents(final long eventId) {
+                return null;
+            }
+
+            @Override
+            public ComputeLineageSubmission submitLineageComputation(final String flowFileUuid) {
+                return null;
+            }
+
+            @Override
+            public QuerySubmission submitQuery(final Query query) {
+                return null;
+            }
+
+            @Override
+            public ProvenanceEventBuilder eventBuilder() {
+                return null;
+            }
+
+            @Override
+            public void initialize(EventReporter eventReporter) throws IOException {
+
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
new file mode 100644
index 0000000..e546f87
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.impl;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.attribute.expression.language.PreparedQuery;
+import org.apache.nifi.attribute.expression.language.Query;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.status.PortStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.processor.StandardPropertyValue;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.Severity;
+
+public class ClusteredReportingContext implements ReportingContext {
+
+    private final EventAccess eventAccess;
+    private final BulletinRepository bulletinRepository;
+    private final ControllerServiceProvider serviceProvider;
+    private final Map<PropertyDescriptor, String> properties;
+    private final Map<PropertyDescriptor, PreparedQuery> preparedQueries;
+
+    public ClusteredReportingContext(final EventAccess eventAccess, final BulletinRepository bulletinRepository,
+            final Map<PropertyDescriptor, String> properties, final ControllerServiceProvider serviceProvider) {
+        this.eventAccess = eventAccess;
+        this.bulletinRepository = bulletinRepository;
+        this.properties = Collections.unmodifiableMap(properties);
+        this.serviceProvider = serviceProvider;
+
+        preparedQueries = new HashMap<>();
+        for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
+            final PropertyDescriptor desc = entry.getKey();
+            String value = entry.getValue();
+            if (value == null) {
+                value = desc.getDefaultValue();
+            }
+
+            final PreparedQuery pq = Query.prepare(value);
+            preparedQueries.put(desc, pq);
+        }
+    }
+
+    @Override
+    public EventAccess getEventAccess() {
+        return eventAccess;
+    }
+
+    @Override
+    public BulletinRepository getBulletinRepository() {
+        return bulletinRepository;
+    }
+
+    @Override
+    public Bulletin createBulletin(final String category, final Severity severity, final String message) {
+        return BulletinFactory.createBulletin(category, severity.name(), message);
+    }
+
+    @Override
+    public Bulletin createBulletin(final String componentId, final String category, final Severity severity, final String message) {
+        final ProcessGroupStatus rootGroupStatus = eventAccess.getControllerStatus();
+        final String groupId = findGroupId(rootGroupStatus, componentId);
+        final String componentName = findComponentName(rootGroupStatus, componentId);
+
+        return BulletinFactory.createBulletin(groupId, componentId, componentName, category, severity.name(), message);
+    }
+
+    @Override
+    public Map<PropertyDescriptor, String> getProperties() {
+        return Collections.unmodifiableMap(properties);
+    }
+
+    @Override
+    public PropertyValue getProperty(final PropertyDescriptor property) {
+        final String configuredValue = properties.get(property);
+        return new StandardPropertyValue(configuredValue == null ? property.getDefaultValue() : configuredValue, serviceProvider, preparedQueries.get(property));
+    }
+
+    @Override
+    public ControllerServiceLookup getControllerServiceLookup() {
+        return serviceProvider;
+    }
+
+    String findGroupId(final ProcessGroupStatus groupStatus, final String componentId) {
+        for (final ProcessorStatus procStatus : groupStatus.getProcessorStatus()) {
+            if (procStatus.getId().equals(componentId)) {
+                return groupStatus.getId();
+            }
+        }
+
+        for (final PortStatus portStatus : groupStatus.getInputPortStatus()) {
+            if (portStatus.getId().equals(componentId)) {
+                return groupStatus.getId();
+            }
+        }
+
+        for (final PortStatus portStatus : groupStatus.getOutputPortStatus()) {
+            if (portStatus.getId().equals(componentId)) {
+                return groupStatus.getId();
+            }
+        }
+
+        for (final ProcessGroupStatus childGroup : groupStatus.getProcessGroupStatus()) {
+            final String groupId = findGroupId(childGroup, componentId);
+            if (groupId != null) {
+                return groupId;
+            }
+        }
+
+        return null;
+    }
+
+    private String findComponentName(final ProcessGroupStatus groupStatus, final String componentId) {
+        for (final ProcessorStatus procStatus : groupStatus.getProcessorStatus()) {
+            if (procStatus.getId().equals(componentId)) {
+                return procStatus.getName();
+            }
+        }
+
+        for (final PortStatus portStatus : groupStatus.getInputPortStatus()) {
+            if (portStatus.getId().equals(componentId)) {
+                return groupStatus.getName();
+            }
+        }
+
+        for (final PortStatus portStatus : groupStatus.getOutputPortStatus()) {
+            if (portStatus.getId().equals(componentId)) {
+                return groupStatus.getName();
+            }
+        }
+
+        for (final ProcessGroupStatus childGroup : groupStatus.getProcessGroupStatus()) {
+            final String componentName = findComponentName(childGroup, componentId);
+            if (componentName != null) {
+                return componentName;
+            }
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java
new file mode 100644
index 0000000..81bb7a7
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.impl;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+
+import org.apache.nifi.cluster.manager.HttpRequestReplicator;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.UriConstructionException;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.logging.NiFiLog;
+import org.apache.nifi.util.FormatUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the <code>HttpRequestReplicator</code> interface. This
+ * implementation parallelizes the node HTTP requests using the given
+ * <code>ExecutorService</code> instance. Individual requests may have
+ * connection and read timeouts set, which may be set during instance
+ * construction. Otherwise, the default is not to timeout.
+ *
+ * If a node protocol scheme is provided during construction, then all requests
+ * will be replicated using the given scheme. If null is provided as the scheme
+ * (the default), then the requests will be replicated using the scheme of the
+ * original URI.
+ *
+ * Clients must call start() and stop() to initialize and shutdown the instance.
+ * The instance must be started before issuing any replication requests.
+ *
+ * @author unattributed
+ */
+public class HttpRequestReplicatorImpl implements HttpRequestReplicator {
+
+    // defaults
+    private static final int DEFAULT_SHUTDOWN_REPLICATOR_SECONDS = 30;
+
+    // logger
+    private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(HttpRequestReplicatorImpl.class));
+
+    // final members
+    private final Client client;            // the client to use for issuing requests
+    private final int numThreads;           // number of threads to use for request replication
+    private final int connectionTimeoutMs;  // connection timeout per node request
+    private final int readTimeoutMs;        // read timeout per node request
+
+    // members
+    private ExecutorService executorService;
+    private int shutdownReplicatorSeconds = DEFAULT_SHUTDOWN_REPLICATOR_SECONDS;
+
+    // guarded by synchronized method access in support of multithreaded replication
+    private String nodeProtocolScheme = null;
+
+    /**
+     * Creates an instance. The connection timeout and read timeout will be
+     * infinite.
+     *
+     * @param numThreads the number of threads to use when parallelizing
+     * requests
+     * @param client a client for making requests
+     */
+    public HttpRequestReplicatorImpl(final int numThreads, final Client client) {
+        this(numThreads, client, "0 sec", "0 sec");
+    }
+
+    /**
+     * Creates an instance.
+     *
+     * @param numThreads the number of threads to use when parallelizing
+     * requests
+     * @param client a client for making requests
+     * @param connectionTimeoutMs the connection timeout specified in
+     * milliseconds
+     * @param readTimeoutMs the read timeout specified in milliseconds
+     */
+    public HttpRequestReplicatorImpl(final int numThreads, final Client client, final String connectionTimeout, final String readTimeout) {
+
+        if (numThreads <= 0) {
+            throw new IllegalArgumentException("The number of threads must be greater than zero.");
+        } else if (client == null) {
+            throw new IllegalArgumentException("Client may not be null.");
+        }
+
+        this.numThreads = numThreads;
+        this.client = client;
+        this.connectionTimeoutMs = (int) FormatUtils.getTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS);
+        this.readTimeoutMs = (int) FormatUtils.getTimeDuration(readTimeout, TimeUnit.MILLISECONDS);
+
+        client.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT, connectionTimeoutMs);
+        client.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT, readTimeoutMs);
+        client.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, Boolean.TRUE);
+    }
+
+    @Override
+    public void start() {
+        if (isRunning()) {
+            throw new IllegalStateException("Instance is already started.");
+        }
+        executorService = Executors.newFixedThreadPool(numThreads);
+    }
+
+    @Override
+    public boolean isRunning() {
+        return executorService != null && !executorService.isShutdown();
+    }
+
+    @Override
+    public void stop() {
+
+        if (!isRunning()) {
+            throw new IllegalStateException("Instance is already stopped.");
+        }
+
+        // shutdown executor service
+        try {
+            if (getShutdownReplicatorSeconds() <= 0) {
+                executorService.shutdownNow();
+            } else {
+                executorService.shutdown();
+            }
+            executorService.awaitTermination(getShutdownReplicatorSeconds(), TimeUnit.SECONDS);
+        } catch (final InterruptedException ex) {
+            Thread.currentThread().interrupt();
+        } finally {
+            if (executorService.isTerminated()) {
+                logger.info("HTTP Request Replicator has been terminated successfully.");
+            } else {
+                logger.warn("HTTP Request Replicator has not terminated properly.  There exists an uninterruptable thread that will take an indeterminate amount of time to stop.");
+            }
+        }
+    }
+
+    /**
+     * Sets the protocol scheme to use when issuing requests to nodes.
+     *
+     * @param nodeProtocolScheme the scheme. Valid values are "http", "https",
+     * or null. If null is specified, then the scheme of the originating request
+     * is used when replicating that request.
+     */
+    public synchronized void setNodeProtocolScheme(final String nodeProtocolScheme) {
+        if (StringUtils.isNotBlank(nodeProtocolScheme)) {
+            if (!"http".equalsIgnoreCase(nodeProtocolScheme) && !"https".equalsIgnoreCase(nodeProtocolScheme)) {
+                throw new IllegalArgumentException("Node Protocol Scheme must be either HTTP or HTTPS");
+            }
+        }
+        this.nodeProtocolScheme = nodeProtocolScheme;
+    }
+
+    public synchronized String getNodeProtocolScheme() {
+        return nodeProtocolScheme;
+    }
+
+    private synchronized String getNodeProtocolScheme(final URI uri) {
+        // if we are not configured to use a protocol scheme, then use the uri's scheme
+        if (StringUtils.isBlank(nodeProtocolScheme)) {
+            return uri.getScheme();
+        }
+        return nodeProtocolScheme;
+    }
+
+    public int getConnectionTimeoutMs() {
+        return connectionTimeoutMs;
+    }
+
+    public int getReadTimeoutMs() {
+        return readTimeoutMs;
+    }
+
+    public int getShutdownReplicatorSeconds() {
+        return shutdownReplicatorSeconds;
+    }
+
+    public void setShutdownReplicatorSeconds(int shutdownReplicatorSeconds) {
+        this.shutdownReplicatorSeconds = shutdownReplicatorSeconds;
+    }
+
+    @Override
+    public Set<NodeResponse> replicate(final Set<NodeIdentifier> nodeIds, final String method,
+            final URI uri, final Map<String, List<String>> parameters, final Map<String, String> headers)
+            throws UriConstructionException {
+        if (nodeIds == null) {
+            throw new IllegalArgumentException("Node IDs may not be null.");
+        } else if (method == null) {
+            throw new IllegalArgumentException("HTTP method may not be null.");
+        } else if (uri == null) {
+            throw new IllegalArgumentException("URI may not be null.");
+        } else if (parameters == null) {
+            throw new IllegalArgumentException("Parameters may not be null.");
+        } else if (headers == null) {
+            throw new IllegalArgumentException("HTTP headers map may not be null.");
+        }
+        return replicateHelper(nodeIds, method, getNodeProtocolScheme(uri), uri.getPath(), parameters, /* entity */ null, headers);
+    }
+
+    @Override
+    public Set<NodeResponse> replicate(final Set<NodeIdentifier> nodeIds, final String method, final URI uri,
+            final Object entity, final Map<String, String> headers) throws UriConstructionException {
+        if (nodeIds == null) {
+            throw new IllegalArgumentException("Node IDs may not be null.");
+        } else if (method == null) {
+            throw new IllegalArgumentException("HTTP method may not be null.");
+        } else if (method.equalsIgnoreCase(HttpMethod.DELETE) || method.equalsIgnoreCase(HttpMethod.GET) || method.equalsIgnoreCase(HttpMethod.HEAD) || method.equalsIgnoreCase(HttpMethod.OPTIONS)) {
+            throw new IllegalArgumentException("HTTP (DELETE | GET | HEAD | OPTIONS) requests cannot have a body containing an entity.");
+        } else if (uri == null) {
+            throw new IllegalArgumentException("URI may not be null.");
+        } else if (entity == null) {
+            throw new IllegalArgumentException("Entity may not be null.");
+        } else if (headers == null) {
+            throw new IllegalArgumentException("HTTP headers map may not be null.");
+        }
+        return replicateHelper(nodeIds, method, getNodeProtocolScheme(uri), uri.getPath(), /* parameters */ null, entity, headers);
+    }
+
+    private Set<NodeResponse> replicateHelper(final Set<NodeIdentifier> nodeIds, final String method, final String scheme,
+            final String path, final Map<String, List<String>> parameters, final Object entity, final Map<String, String> headers)
+            throws UriConstructionException {
+
+        if (nodeIds.isEmpty()) {
+            return new HashSet<>(); // return quickly for trivial case
+        }
+
+        final CompletionService<NodeResponse> completionService = new ExecutorCompletionService<>(executorService);
+
+        // keeps track of future requests so that failed requests can be tied back to the failing node
+        final Collection<NodeHttpRequestFutureWrapper> futureNodeHttpRequests = new ArrayList<>();
+
+        // construct the URIs for the nodes
+        final Map<NodeIdentifier, URI> uriMap = new HashMap<>();
+        try {
+            for (final NodeIdentifier nodeId : nodeIds) {
+                final URI nodeUri = new URI(scheme, null, nodeId.getApiAddress(), nodeId.getApiPort(), path, /* query */ null, /* fragment */ null);
+                uriMap.put(nodeId, nodeUri);
+            }
+        } catch (final URISyntaxException use) {
+            throw new UriConstructionException(use);
+        }
+
+        // submit the requests to the nodes
+        final String requestId = UUID.randomUUID().toString();
+        headers.put(WebClusterManager.REQUEST_ID_HEADER, requestId);
+        for (final Map.Entry<NodeIdentifier, URI> entry : uriMap.entrySet()) {
+            final NodeIdentifier nodeId = entry.getKey();
+            final URI nodeUri = entry.getValue();
+            final NodeHttpRequestCallable callable = (entity == null)
+                    ? new NodeHttpRequestCallable(nodeId, method, nodeUri, parameters, headers)
+                    : new NodeHttpRequestCallable(nodeId, method, nodeUri, entity, headers);
+            futureNodeHttpRequests.add(new NodeHttpRequestFutureWrapper(nodeId, method, nodeUri, completionService.submit(callable)));
+        }
+
+        // get the node responses
+        final Set<NodeResponse> result = new HashSet<>();
+        for (int i = 0; i < nodeIds.size(); i++) {
+
+            // keeps track of the original request information in case we receive an exception
+            NodeHttpRequestFutureWrapper futureNodeHttpRequest = null;
+            try {
+
+                // get the future resource response for the node
+                final Future<NodeResponse> futureNodeResourceResponse = completionService.take();
+
+                // find the original request by comparing the submitted future with the future returned by the completion service
+                for (final NodeHttpRequestFutureWrapper futureNodeHttpRequestElem : futureNodeHttpRequests) {
+                    if (futureNodeHttpRequestElem.getFuture() == futureNodeResourceResponse) {
+                        futureNodeHttpRequest = futureNodeHttpRequestElem;
+                    }
+                }
+
+                // try to retrieve the node response and add to result
+                final NodeResponse nodeResponse = futureNodeResourceResponse.get();
+                result.add(nodeResponse);
+
+            } catch (final InterruptedException | ExecutionException ex) {
+
+                logger.warn("Node request for " + futureNodeHttpRequest.getNodeId() + " encountered exception: " + ex, ex);
+
+                // create node response with the thrown exception and add to result
+                final NodeResponse nodeResponse = new NodeResponse(
+                        futureNodeHttpRequest.getNodeId(), futureNodeHttpRequest.getHttpMethod(), futureNodeHttpRequest.getRequestUri(), ex);
+                result.add(nodeResponse);
+
+            }
+        }
+
+        if (logger.isDebugEnabled()) {
+            NodeResponse min = null;
+            NodeResponse max = null;
+            long nanosSum = 0L;
+            int nanosAdded = 0;
+
+            for (final NodeResponse response : result) {
+                final long requestNanos = response.getRequestDuration(TimeUnit.NANOSECONDS);
+                final long minNanos = (min == null) ? -1 : min.getRequestDuration(TimeUnit.NANOSECONDS);
+                final long maxNanos = (max == null) ? -1 : max.getRequestDuration(TimeUnit.NANOSECONDS);
+
+                if (requestNanos < minNanos || minNanos < 0L) {
+                    min = response;
+                }
+
+                if (requestNanos > maxNanos || maxNanos < 0L) {
+                    max = response;
+                }
+
+                if (requestNanos >= 0L) {
+                    nanosSum += requestNanos;
+                    nanosAdded++;
+                }
+            }
+
+            final StringBuilder sb = new StringBuilder();
+            sb.append("Node Responses for ").append(method).append(" ").append(path).append(" (Request ID ").append(requestId).append("):\n");
+            for (final NodeResponse response : result) {
+                sb.append(response).append("\n");
+            }
+
+            final long averageNanos = (nanosAdded == 0) ? -1L : nanosSum / nanosAdded;
+            final long averageMillis = (averageNanos < 0) ? averageNanos : TimeUnit.MILLISECONDS.convert(averageNanos, TimeUnit.NANOSECONDS);
+            logger.debug("For {} {} (Request ID {}), minimum response time = {}, max = {}, average = {} ms",
+                    method, path, requestId, min, max, averageMillis);
+            logger.debug(sb.toString());
+        }
+
+        return result;
+    }
+
+    /**
+     * Wraps a future node response with info from originating request. This
+     * coupling allows for futures that encountered exceptions to be linked back
+     * to the failing node and better reported.
+     */
+    private class NodeHttpRequestFutureWrapper {
+
+        private final NodeIdentifier nodeId;
+
+        private final String httpMethod;
+
+        private final URI requestUri;
+
+        private final Future<NodeResponse> future;
+
+        public NodeHttpRequestFutureWrapper(final NodeIdentifier nodeId, final String httpMethod,
+                final URI requestUri, final Future<NodeResponse> future) {
+            if (nodeId == null) {
+                throw new IllegalArgumentException("Node ID may not be null.");
+            } else if (StringUtils.isBlank(httpMethod)) {
+                throw new IllegalArgumentException("Http method may not be null or empty.");
+            } else if (requestUri == null) {
+                throw new IllegalArgumentException("Request URI may not be null.");
+            } else if (future == null) {
+                throw new IllegalArgumentException("Future may not be null.");
+            }
+            this.nodeId = nodeId;
+            this.httpMethod = httpMethod;
+            this.requestUri = requestUri;
+            this.future = future;
+        }
+
+        public NodeIdentifier getNodeId() {
+            return nodeId;
+        }
+
+        public String getHttpMethod() {
+            return httpMethod;
+        }
+
+        public URI getRequestUri() {
+            return requestUri;
+        }
+
+        public Future<NodeResponse> getFuture() {
+            return future;
+        }
+    }
+
+    /**
+     * A Callable for making an HTTP request to a single node and returning its
+     * response.
+     */
+    private class NodeHttpRequestCallable implements Callable<NodeResponse> {
+
+        private final NodeIdentifier nodeId;
+        private final String method;
+        private final URI uri;
+        private final Object entity;
+        private final Map<String, List<String>> parameters = new HashMap<>();
+        private final Map<String, String> headers = new HashMap<>();
+
+        private NodeHttpRequestCallable(final NodeIdentifier nodeId, final String method,
+                final URI uri, final Object entity, final Map<String, String> headers) {
+            this.nodeId = nodeId;
+            this.method = method;
+            this.uri = uri;
+            this.entity = entity;
+            this.headers.putAll(headers);
+        }
+
+        private NodeHttpRequestCallable(final NodeIdentifier nodeId, final String method,
+                final URI uri, final Map<String, List<String>> parameters, final Map<String, String> headers) {
+            this.nodeId = nodeId;
+            this.method = method;
+            this.uri = uri;
+            this.entity = null;
+            this.parameters.putAll(parameters);
+            this.headers.putAll(headers);
+        }
+
+        @Override
+        public NodeResponse call() {
+
+            try {
+                // create and send the request
+                final WebResource.Builder resourceBuilder = getResourceBuilder();
+                final String requestId = headers.get("x-nifi-request-id");
+
+                final long startNanos = System.nanoTime();
+                final ClientResponse clientResponse;
+                if (HttpMethod.DELETE.equalsIgnoreCase(method)) {
+                    clientResponse = resourceBuilder.delete(ClientResponse.class);
+                } else if (HttpMethod.GET.equalsIgnoreCase(method)) {
+                    clientResponse = resourceBuilder.get(ClientResponse.class);
+                } else if (HttpMethod.HEAD.equalsIgnoreCase(method)) {
+                    clientResponse = resourceBuilder.head();
+                } else if (HttpMethod.OPTIONS.equalsIgnoreCase(method)) {
+                    clientResponse = resourceBuilder.options(ClientResponse.class);
+                } else if (HttpMethod.POST.equalsIgnoreCase(method)) {
+                    clientResponse = resourceBuilder.post(ClientResponse.class);
+                } else if (HttpMethod.PUT.equalsIgnoreCase(method)) {
+                    clientResponse = resourceBuilder.put(ClientResponse.class);
+                } else {
+                    throw new IllegalArgumentException("HTTP Method '" + method + "' not supported for request replication.");
+                }
+
+                // create and return the response
+                return new NodeResponse(nodeId, method, uri, clientResponse, System.nanoTime() - startNanos, requestId);
+
+            } catch (final UniformInterfaceException | IllegalArgumentException t) {
+                return new NodeResponse(nodeId, method, uri, t);
+            }
+
+        }
+
+        private WebResource.Builder getResourceBuilder() {
+
+            // convert parameters to a more convenient data structure
+            final MultivaluedMap<String, String> map = new MultivaluedMapImpl();
+            map.putAll(parameters);
+
+            // create the resource
+            WebResource resource = client.resource(uri);
+
+            if (WebClusterManager.isResponseInterpreted(uri, method)) {
+                resource.addFilter(new GZIPContentEncodingFilter(false));
+            }
+
+            // set the parameters as either query parameters or as request body
+            final WebResource.Builder builder;
+            if (HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.HEAD.equalsIgnoreCase(method) || HttpMethod.GET.equalsIgnoreCase(method) || HttpMethod.OPTIONS.equalsIgnoreCase(method)) {
+                resource = resource.queryParams(map);
+                builder = resource.getRequestBuilder();
+            } else {
+                if (entity == null) {
+                    builder = resource.entity(map);
+                } else {
+                    builder = resource.entity(entity);
+                }
+            }
+
+            // set headers
+            boolean foundContentType = false;
+            for (final Map.Entry<String, String> entry : headers.entrySet()) {
+                builder.header(entry.getKey(), entry.getValue());
+                if (entry.getKey().equalsIgnoreCase("content-type")) {
+                    foundContentType = true;
+                }
+            }
+
+            // set default content type
+            if (!foundContentType) {
+                // set default content type
+                builder.type(MediaType.APPLICATION_FORM_URLENCODED);
+            }
+
+            return builder;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImpl.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImpl.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImpl.java
new file mode 100644
index 0000000..afade7e
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.impl;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.nifi.cluster.manager.HttpResponseMapper;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.node.Node.Status;
+import org.apache.nifi.logging.NiFiLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Determines the status of nodes based on their HTTP response codes.
+ *
+ * The algorithm is as follows.
+ *
+ * If any HTTP responses were 2XX, then disconnect non-2XX responses. This is
+ * because 2XX may have changed a node's flow.
+ *
+ * If no 2XX responses were received, then the node's flow has not changed.
+ * Instead of disconnecting everything, we only disconnect the nodes with
+ * internal errors, i.e., 5XX responses.
+ *
+ * @author unattributed
+ */
+public class HttpResponseMapperImpl implements HttpResponseMapper {
+
+    private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(HttpResponseMapperImpl.class));
+
+    @Override
+    public Map<NodeResponse, Status> map(final URI requestURI, final Set<NodeResponse> nodeResponses) {
+
+        final Map<NodeResponse, Status> result = new HashMap<>();
+
+        // check if any responses were 2XX
+        boolean found2xx = false;
+        for (final NodeResponse nodeResponse : nodeResponses) {
+            if (nodeResponse.is2xx()) {
+                found2xx = true;
+                break;
+            }
+        }
+
+        // determine the status of each node 
+        for (final NodeResponse nodeResponse : nodeResponses) {
+
+            final Node.Status status;
+            if (found2xx) {
+                // disconnect nodes with non-2XX responses
+                status = nodeResponse.is2xx()
+                        ? Node.Status.CONNECTED
+                        : Node.Status.DISCONNECTED;
+            } else {
+                // disconnect nodes with 5XX responses or exception
+                status = nodeResponse.is5xx()
+                        ? Node.Status.DISCONNECTED
+                        : Node.Status.CONNECTED;
+            }
+
+            result.put(nodeResponse, status);
+        }
+
+        return result;
+    }
+
+}