You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:36 UTC
[13/51] [partial] incubator-nifi git commit: Initial code contribution
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/DisconnectedNodeMutableRequestException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/DisconnectedNodeMutableRequestException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/DisconnectedNodeMutableRequestException.java
new file mode 100644
index 0000000..412a555
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/DisconnectedNodeMutableRequestException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a HTTP request that may change a node's
+ * dataflow is to be replicated while one or more nodes are disconnected.
+ *
+ * @author unattributed
+ */
+public class DisconnectedNodeMutableRequestException extends MutableRequestException {
+
+ public DisconnectedNodeMutableRequestException() {
+ }
+
+ public DisconnectedNodeMutableRequestException(String msg) {
+ super(msg);
+ }
+
+ public DisconnectedNodeMutableRequestException(Throwable cause) {
+ super(cause);
+ }
+
+ public DisconnectedNodeMutableRequestException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java
new file mode 100644
index 0000000..6c4e670
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalClusterStateException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Signals that an operation to be performed on a cluster has been invoked at an
+ * illegal or inappropriate time.
+ *
+ * @author unattributed
+ */
+public class IllegalClusterStateException extends ClusterException {
+
+ public IllegalClusterStateException() {
+ }
+
+ public IllegalClusterStateException(String msg) {
+ super(msg);
+ }
+
+ public IllegalClusterStateException(Throwable cause) {
+ super(cause);
+ }
+
+ public IllegalClusterStateException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDeletionException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDeletionException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDeletionException.java
new file mode 100644
index 0000000..adef62a
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDeletionException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a deletion request is issued to a node
+ * that cannot be deleted (e.g., the node is not disconnected).
+ *
+ * @author unattributed
+ */
+public class IllegalNodeDeletionException extends IllegalClusterStateException {
+
+ public IllegalNodeDeletionException() {
+ }
+
+ public IllegalNodeDeletionException(String msg) {
+ super(msg);
+ }
+
+ public IllegalNodeDeletionException(Throwable cause) {
+ super(cause);
+ }
+
+ public IllegalNodeDeletionException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java
new file mode 100644
index 0000000..7e61b24
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeDisconnectionException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a disconnection request is issued to a
+ * node that cannot be disconnected (e.g., last node in cluster, node is primary
+ * node).
+ *
+ * @author unattributed
+ */
+public class IllegalNodeDisconnectionException extends IllegalClusterStateException {
+
+ public IllegalNodeDisconnectionException() {
+ }
+
+ public IllegalNodeDisconnectionException(String msg) {
+ super(msg);
+ }
+
+ public IllegalNodeDisconnectionException(Throwable cause) {
+ super(cause);
+ }
+
+ public IllegalNodeDisconnectionException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeReconnectionException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeReconnectionException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeReconnectionException.java
new file mode 100644
index 0000000..96c76bc
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeReconnectionException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a reconnection request is issued to a
+ * node that cannot be reconnected (e.g., the node is not disconnected).
+ *
+ * @author unattributed
+ */
+public class IllegalNodeReconnectionException extends IllegalClusterStateException {
+
+ public IllegalNodeReconnectionException() {
+ }
+
+ public IllegalNodeReconnectionException(String msg) {
+ super(msg);
+ }
+
+ public IllegalNodeReconnectionException(Throwable cause) {
+ super(cause);
+ }
+
+ public IllegalNodeReconnectionException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IneligiblePrimaryNodeException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IneligiblePrimaryNodeException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IneligiblePrimaryNodeException.java
new file mode 100644
index 0000000..4b0097a
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IneligiblePrimaryNodeException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when the primary role cannot be assigned to a
+ * node because the node is ineligible for the role.
+ *
+ * @author unattributed
+ */
+public class IneligiblePrimaryNodeException extends IllegalClusterStateException {
+
+ public IneligiblePrimaryNodeException() {
+ }
+
+ public IneligiblePrimaryNodeException(String msg) {
+ super(msg);
+ }
+
+ public IneligiblePrimaryNodeException(Throwable cause) {
+ super(cause);
+ }
+
+ public IneligiblePrimaryNodeException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/MutableRequestException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/MutableRequestException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/MutableRequestException.java
new file mode 100644
index 0000000..d160587
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/MutableRequestException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a HTTP request that may change a node's
+ * state is to be replicated while the cluster or connected nodes are unable to
+ * change their state (e.g., a new node is connecting to the cluster).
+ *
+ * @author unattributed
+ */
+public class MutableRequestException extends IllegalClusterStateException {
+
+ public MutableRequestException() {
+ }
+
+ public MutableRequestException(String msg) {
+ super(msg);
+ }
+
+ public MutableRequestException(Throwable cause) {
+ super(cause);
+ }
+
+ public MutableRequestException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java
new file mode 100644
index 0000000..8d704b9
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoConnectedNodesException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when the cluster is unable to service a
+ * request because no nodes are connected.
+ *
+ * @author unattributed
+ */
+public class NoConnectedNodesException extends ClusterException {
+
+ public NoConnectedNodesException() {
+ }
+
+ public NoConnectedNodesException(String msg) {
+ super(msg);
+ }
+
+ public NoConnectedNodesException(Throwable cause) {
+ super(cause);
+ }
+
+ public NoConnectedNodesException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java
new file mode 100644
index 0000000..9e17a23
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NoResponseFromNodesException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when the cluster is unable to service a
+ * request because no nodes returned a response. When the given request is not
+ * mutable the nodes are left in their previous state.
+ *
+ * @author unattributed
+ */
+public class NoResponseFromNodesException extends ClusterException {
+
+ public NoResponseFromNodesException() {
+ }
+
+ public NoResponseFromNodesException(String msg) {
+ super(msg);
+ }
+
+ public NoResponseFromNodesException(Throwable cause) {
+ super(cause);
+ }
+
+ public NoResponseFromNodesException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java
new file mode 100644
index 0000000..3bd2f4b
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeDisconnectionException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a disconnection request to a node
+ * failed.
+ *
+ * @author unattributed
+ */
+public class NodeDisconnectionException extends ClusterException {
+
+ public NodeDisconnectionException() {
+ }
+
+ public NodeDisconnectionException(String msg) {
+ super(msg);
+ }
+
+ public NodeDisconnectionException(Throwable cause) {
+ super(cause);
+ }
+
+ public NodeDisconnectionException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java
new file mode 100644
index 0000000..8c40cef
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/NodeReconnectionException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a reconnection request to a node failed.
+ *
+ * @author unattributed
+ */
+public class NodeReconnectionException extends ClusterException {
+
+ public NodeReconnectionException() {
+ }
+
+ public NodeReconnectionException(String msg) {
+ super(msg);
+ }
+
+ public NodeReconnectionException(Throwable cause) {
+ super(cause);
+ }
+
+ public NodeReconnectionException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/PrimaryRoleAssignmentException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/PrimaryRoleAssignmentException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/PrimaryRoleAssignmentException.java
new file mode 100644
index 0000000..403f7a5
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/PrimaryRoleAssignmentException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when the cluster is unable to update the
+ * primary role of a node.
+ *
+ * @author unattributed
+ */
+public class PrimaryRoleAssignmentException extends IllegalClusterStateException {
+
+ public PrimaryRoleAssignmentException() {
+ }
+
+ public PrimaryRoleAssignmentException(String msg) {
+ super(msg);
+ }
+
+ public PrimaryRoleAssignmentException(Throwable cause) {
+ super(cause);
+ }
+
+ public PrimaryRoleAssignmentException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/SafeModeMutableRequestException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/SafeModeMutableRequestException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/SafeModeMutableRequestException.java
new file mode 100644
index 0000000..f544f26
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/SafeModeMutableRequestException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a HTTP request that may change a node's
+ * dataflow is to be replicated while the cluster is in safe mode.
+ *
+ * @author unattributed
+ */
+public class SafeModeMutableRequestException extends MutableRequestException {
+
+ public SafeModeMutableRequestException() {
+ }
+
+ public SafeModeMutableRequestException(String msg) {
+ super(msg);
+ }
+
+ public SafeModeMutableRequestException(Throwable cause) {
+ super(cause);
+ }
+
+ public SafeModeMutableRequestException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java
new file mode 100644
index 0000000..914bb56
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UnknownNodeException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a request is made for a node that does
+ * not exist.
+ *
+ * @author unattributed
+ */
+public class UnknownNodeException extends ClusterException {
+
+ public UnknownNodeException() {
+ }
+
+ public UnknownNodeException(String msg) {
+ super(msg);
+ }
+
+ public UnknownNodeException(Throwable cause) {
+ super(cause);
+ }
+
+ public UnknownNodeException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UriConstructionException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UriConstructionException.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UriConstructionException.java
new file mode 100644
index 0000000..773d7b5
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/exception/UriConstructionException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a URI cannot be constructed from the
+ * given information. This exception is similar to Java's URISyntaxException
+ * except that it extends RuntimeException.
+ *
+ * @author unattributed
+ */
+public class UriConstructionException extends RuntimeException {
+
+ public UriConstructionException() {
+ }
+
+ public UriConstructionException(String msg) {
+ super(msg);
+ }
+
+ public UriConstructionException(Throwable cause) {
+ super(cause);
+ }
+
+ public UriConstructionException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
new file mode 100644
index 0000000..2015530
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QuerySubmission;
+import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.reporting.EventAccess;
+
+public class ClusteredEventAccess implements EventAccess {
+
+ private final WebClusterManager clusterManager;
+
+ public ClusteredEventAccess(final WebClusterManager clusterManager) {
+ this.clusterManager = clusterManager;
+ }
+
+ @Override
+ public ProcessGroupStatus getControllerStatus() {
+ return clusterManager.getProcessGroupStatus(WebClusterManager.ROOT_GROUP_ID_ALIAS);
+ }
+
+ @Override
+ public List<ProvenanceEventRecord> getProvenanceEvents(long arg0, int arg1) throws IOException {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public ProvenanceEventRepository getProvenanceRepository() {
+ // NCM doesn't have provenance events, because it doesn't process FlowFiles.
+ // So we just use a Provenance Event Repository that does nothing.
+ return new ProvenanceEventRepository() {
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public ProvenanceEventRecord getEvent(long eventId) throws IOException {
+ return null;
+ }
+
+ @Override
+ public List<ProvenanceEventRecord> getEvents(long startEventId, int maxEvents) throws IOException {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public Long getMaxEventId() {
+ return null;
+ }
+
+ @Override
+ public List<SearchableField> getSearchableAttributes() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<SearchableField> getSearchableFields() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public void registerEvent(final ProvenanceEventRecord event) {
+ }
+
+ @Override
+ public void registerEvents(final Iterable<ProvenanceEventRecord> events) {
+ }
+
+ @Override
+ public ComputeLineageSubmission retrieveLineageSubmission(final String submissionId) {
+ return null;
+ }
+
+ @Override
+ public QuerySubmission retrieveQuerySubmission(final String submissionId) {
+ return null;
+ }
+
+ @Override
+ public ComputeLineageSubmission submitExpandChildren(final long eventId) {
+ return null;
+ }
+
+ @Override
+ public ComputeLineageSubmission submitExpandParents(final long eventId) {
+ return null;
+ }
+
+ @Override
+ public ComputeLineageSubmission submitLineageComputation(final String flowFileUuid) {
+ return null;
+ }
+
+ @Override
+ public QuerySubmission submitQuery(final Query query) {
+ return null;
+ }
+
+ @Override
+ public ProvenanceEventBuilder eventBuilder() {
+ return null;
+ }
+
+ @Override
+ public void initialize(EventReporter eventReporter) throws IOException {
+
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
new file mode 100644
index 0000000..e546f87
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.impl;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.attribute.expression.language.PreparedQuery;
+import org.apache.nifi.attribute.expression.language.Query;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.status.PortStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.processor.StandardPropertyValue;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.Severity;
+
+public class ClusteredReportingContext implements ReportingContext {
+
+ private final EventAccess eventAccess;
+ private final BulletinRepository bulletinRepository;
+ private final ControllerServiceProvider serviceProvider;
+ private final Map<PropertyDescriptor, String> properties;
+ private final Map<PropertyDescriptor, PreparedQuery> preparedQueries;
+
+ public ClusteredReportingContext(final EventAccess eventAccess, final BulletinRepository bulletinRepository,
+ final Map<PropertyDescriptor, String> properties, final ControllerServiceProvider serviceProvider) {
+ this.eventAccess = eventAccess;
+ this.bulletinRepository = bulletinRepository;
+ this.properties = Collections.unmodifiableMap(properties);
+ this.serviceProvider = serviceProvider;
+
+ preparedQueries = new HashMap<>();
+ for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
+ final PropertyDescriptor desc = entry.getKey();
+ String value = entry.getValue();
+ if (value == null) {
+ value = desc.getDefaultValue();
+ }
+
+ final PreparedQuery pq = Query.prepare(value);
+ preparedQueries.put(desc, pq);
+ }
+ }
+
+ @Override
+ public EventAccess getEventAccess() {
+ return eventAccess;
+ }
+
+ @Override
+ public BulletinRepository getBulletinRepository() {
+ return bulletinRepository;
+ }
+
+ @Override
+ public Bulletin createBulletin(final String category, final Severity severity, final String message) {
+ return BulletinFactory.createBulletin(category, severity.name(), message);
+ }
+
+ @Override
+ public Bulletin createBulletin(final String componentId, final String category, final Severity severity, final String message) {
+ final ProcessGroupStatus rootGroupStatus = eventAccess.getControllerStatus();
+ final String groupId = findGroupId(rootGroupStatus, componentId);
+ final String componentName = findComponentName(rootGroupStatus, componentId);
+
+ return BulletinFactory.createBulletin(groupId, componentId, componentName, category, severity.name(), message);
+ }
+
+ @Override
+ public Map<PropertyDescriptor, String> getProperties() {
+ return Collections.unmodifiableMap(properties);
+ }
+
+ @Override
+ public PropertyValue getProperty(final PropertyDescriptor property) {
+ final String configuredValue = properties.get(property);
+ return new StandardPropertyValue(configuredValue == null ? property.getDefaultValue() : configuredValue, serviceProvider, preparedQueries.get(property));
+ }
+
+ @Override
+ public ControllerServiceLookup getControllerServiceLookup() {
+ return serviceProvider;
+ }
+
+ String findGroupId(final ProcessGroupStatus groupStatus, final String componentId) {
+ for (final ProcessorStatus procStatus : groupStatus.getProcessorStatus()) {
+ if (procStatus.getId().equals(componentId)) {
+ return groupStatus.getId();
+ }
+ }
+
+ for (final PortStatus portStatus : groupStatus.getInputPortStatus()) {
+ if (portStatus.getId().equals(componentId)) {
+ return groupStatus.getId();
+ }
+ }
+
+ for (final PortStatus portStatus : groupStatus.getOutputPortStatus()) {
+ if (portStatus.getId().equals(componentId)) {
+ return groupStatus.getId();
+ }
+ }
+
+ for (final ProcessGroupStatus childGroup : groupStatus.getProcessGroupStatus()) {
+ final String groupId = findGroupId(childGroup, componentId);
+ if (groupId != null) {
+ return groupId;
+ }
+ }
+
+ return null;
+ }
+
+ private String findComponentName(final ProcessGroupStatus groupStatus, final String componentId) {
+ for (final ProcessorStatus procStatus : groupStatus.getProcessorStatus()) {
+ if (procStatus.getId().equals(componentId)) {
+ return procStatus.getName();
+ }
+ }
+
+ for (final PortStatus portStatus : groupStatus.getInputPortStatus()) {
+ if (portStatus.getId().equals(componentId)) {
+ return groupStatus.getName();
+ }
+ }
+
+ for (final PortStatus portStatus : groupStatus.getOutputPortStatus()) {
+ if (portStatus.getId().equals(componentId)) {
+ return groupStatus.getName();
+ }
+ }
+
+ for (final ProcessGroupStatus childGroup : groupStatus.getProcessGroupStatus()) {
+ final String componentName = findComponentName(childGroup, componentId);
+ if (componentName != null) {
+ return componentName;
+ }
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java
new file mode 100644
index 0000000..81bb7a7
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpRequestReplicatorImpl.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.impl;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.HttpMethod;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+
+import org.apache.nifi.cluster.manager.HttpRequestReplicator;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.UriConstructionException;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.logging.NiFiLog;
+import org.apache.nifi.util.FormatUtils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the <code>HttpRequestReplicator</code> interface. This
+ * implementation parallelizes the node HTTP requests using the given
+ * <code>ExecutorService</code> instance. Individual requests may have
+ * connection and read timeouts set, which may be set during instance
+ * construction. Otherwise, the default is not to timeout.
+ *
+ * If a node protocol scheme is provided during construction, then all requests
+ * will be replicated using the given scheme. If null is provided as the scheme
+ * (the default), then the requests will be replicated using the scheme of the
+ * original URI.
+ *
+ * Clients must call start() and stop() to initialize and shutdown the instance.
+ * The instance must be started before issuing any replication requests.
+ *
+ * @author unattributed
+ */
+public class HttpRequestReplicatorImpl implements HttpRequestReplicator {
+
+ // defaults
+ private static final int DEFAULT_SHUTDOWN_REPLICATOR_SECONDS = 30;
+
+ // logger
+ private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(HttpRequestReplicatorImpl.class));
+
+ // final members
+ private final Client client; // the client to use for issuing requests
+ private final int numThreads; // number of threads to use for request replication
+ private final int connectionTimeoutMs; // connection timeout per node request
+ private final int readTimeoutMs; // read timeout per node request
+
+ // members
+ private ExecutorService executorService;
+ private int shutdownReplicatorSeconds = DEFAULT_SHUTDOWN_REPLICATOR_SECONDS;
+
+ // guarded by synchronized method access in support of multithreaded replication
+ private String nodeProtocolScheme = null;
+
+ /**
+ * Creates an instance. The connection timeout and read timeout will be
+ * infinite.
+ *
+ * @param numThreads the number of threads to use when parallelizing
+ * requests
+ * @param client a client for making requests
+ */
+ public HttpRequestReplicatorImpl(final int numThreads, final Client client) {
+ this(numThreads, client, "0 sec", "0 sec");
+ }
+
+ /**
+ * Creates an instance.
+ *
+ * @param numThreads the number of threads to use when parallelizing
+ * requests
+ * @param client a client for making requests
+ * @param connectionTimeoutMs the connection timeout specified in
+ * milliseconds
+ * @param readTimeoutMs the read timeout specified in milliseconds
+ */
+ public HttpRequestReplicatorImpl(final int numThreads, final Client client, final String connectionTimeout, final String readTimeout) {
+
+ if (numThreads <= 0) {
+ throw new IllegalArgumentException("The number of threads must be greater than zero.");
+ } else if (client == null) {
+ throw new IllegalArgumentException("Client may not be null.");
+ }
+
+ this.numThreads = numThreads;
+ this.client = client;
+ this.connectionTimeoutMs = (int) FormatUtils.getTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS);
+ this.readTimeoutMs = (int) FormatUtils.getTimeDuration(readTimeout, TimeUnit.MILLISECONDS);
+
+ client.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT, connectionTimeoutMs);
+ client.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT, readTimeoutMs);
+ client.getProperties().put(ClientConfig.PROPERTY_FOLLOW_REDIRECTS, Boolean.TRUE);
+ }
+
+ @Override
+ public void start() {
+ if (isRunning()) {
+ throw new IllegalStateException("Instance is already started.");
+ }
+ executorService = Executors.newFixedThreadPool(numThreads);
+ }
+
+ @Override
+ public boolean isRunning() {
+ return executorService != null && !executorService.isShutdown();
+ }
+
+ @Override
+ public void stop() {
+
+ if (!isRunning()) {
+ throw new IllegalStateException("Instance is already stopped.");
+ }
+
+ // shutdown executor service
+ try {
+ if (getShutdownReplicatorSeconds() <= 0) {
+ executorService.shutdownNow();
+ } else {
+ executorService.shutdown();
+ }
+ executorService.awaitTermination(getShutdownReplicatorSeconds(), TimeUnit.SECONDS);
+ } catch (final InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ } finally {
+ if (executorService.isTerminated()) {
+ logger.info("HTTP Request Replicator has been terminated successfully.");
+ } else {
+ logger.warn("HTTP Request Replicator has not terminated properly. There exists an uninterruptable thread that will take an indeterminate amount of time to stop.");
+ }
+ }
+ }
+
+ /**
+ * Sets the protocol scheme to use when issuing requests to nodes.
+ *
+ * @param nodeProtocolScheme the scheme. Valid values are "http", "https",
+ * or null. If null is specified, then the scheme of the originating request
+ * is used when replicating that request.
+ */
+ public synchronized void setNodeProtocolScheme(final String nodeProtocolScheme) {
+ if (StringUtils.isNotBlank(nodeProtocolScheme)) {
+ if (!"http".equalsIgnoreCase(nodeProtocolScheme) && !"https".equalsIgnoreCase(nodeProtocolScheme)) {
+ throw new IllegalArgumentException("Node Protocol Scheme must be either HTTP or HTTPS");
+ }
+ }
+ this.nodeProtocolScheme = nodeProtocolScheme;
+ }
+
+ public synchronized String getNodeProtocolScheme() {
+ return nodeProtocolScheme;
+ }
+
+ private synchronized String getNodeProtocolScheme(final URI uri) {
+ // if we are not configured to use a protocol scheme, then use the uri's scheme
+ if (StringUtils.isBlank(nodeProtocolScheme)) {
+ return uri.getScheme();
+ }
+ return nodeProtocolScheme;
+ }
+
+ public int getConnectionTimeoutMs() {
+ return connectionTimeoutMs;
+ }
+
+ public int getReadTimeoutMs() {
+ return readTimeoutMs;
+ }
+
+ public int getShutdownReplicatorSeconds() {
+ return shutdownReplicatorSeconds;
+ }
+
+ public void setShutdownReplicatorSeconds(int shutdownReplicatorSeconds) {
+ this.shutdownReplicatorSeconds = shutdownReplicatorSeconds;
+ }
+
+ @Override
+ public Set<NodeResponse> replicate(final Set<NodeIdentifier> nodeIds, final String method,
+ final URI uri, final Map<String, List<String>> parameters, final Map<String, String> headers)
+ throws UriConstructionException {
+ if (nodeIds == null) {
+ throw new IllegalArgumentException("Node IDs may not be null.");
+ } else if (method == null) {
+ throw new IllegalArgumentException("HTTP method may not be null.");
+ } else if (uri == null) {
+ throw new IllegalArgumentException("URI may not be null.");
+ } else if (parameters == null) {
+ throw new IllegalArgumentException("Parameters may not be null.");
+ } else if (headers == null) {
+ throw new IllegalArgumentException("HTTP headers map may not be null.");
+ }
+ return replicateHelper(nodeIds, method, getNodeProtocolScheme(uri), uri.getPath(), parameters, /* entity */ null, headers);
+ }
+
+ @Override
+ public Set<NodeResponse> replicate(final Set<NodeIdentifier> nodeIds, final String method, final URI uri,
+ final Object entity, final Map<String, String> headers) throws UriConstructionException {
+ if (nodeIds == null) {
+ throw new IllegalArgumentException("Node IDs may not be null.");
+ } else if (method == null) {
+ throw new IllegalArgumentException("HTTP method may not be null.");
+ } else if (method.equalsIgnoreCase(HttpMethod.DELETE) || method.equalsIgnoreCase(HttpMethod.GET) || method.equalsIgnoreCase(HttpMethod.HEAD) || method.equalsIgnoreCase(HttpMethod.OPTIONS)) {
+ throw new IllegalArgumentException("HTTP (DELETE | GET | HEAD | OPTIONS) requests cannot have a body containing an entity.");
+ } else if (uri == null) {
+ throw new IllegalArgumentException("URI may not be null.");
+ } else if (entity == null) {
+ throw new IllegalArgumentException("Entity may not be null.");
+ } else if (headers == null) {
+ throw new IllegalArgumentException("HTTP headers map may not be null.");
+ }
+ return replicateHelper(nodeIds, method, getNodeProtocolScheme(uri), uri.getPath(), /* parameters */ null, entity, headers);
+ }
+
+ private Set<NodeResponse> replicateHelper(final Set<NodeIdentifier> nodeIds, final String method, final String scheme,
+ final String path, final Map<String, List<String>> parameters, final Object entity, final Map<String, String> headers)
+ throws UriConstructionException {
+
+ if (nodeIds.isEmpty()) {
+ return new HashSet<>(); // return quickly for trivial case
+ }
+
+ final CompletionService<NodeResponse> completionService = new ExecutorCompletionService<>(executorService);
+
+ // keeps track of future requests so that failed requests can be tied back to the failing node
+ final Collection<NodeHttpRequestFutureWrapper> futureNodeHttpRequests = new ArrayList<>();
+
+ // construct the URIs for the nodes
+ final Map<NodeIdentifier, URI> uriMap = new HashMap<>();
+ try {
+ for (final NodeIdentifier nodeId : nodeIds) {
+ final URI nodeUri = new URI(scheme, null, nodeId.getApiAddress(), nodeId.getApiPort(), path, /* query */ null, /* fragment */ null);
+ uriMap.put(nodeId, nodeUri);
+ }
+ } catch (final URISyntaxException use) {
+ throw new UriConstructionException(use);
+ }
+
+ // submit the requests to the nodes
+ final String requestId = UUID.randomUUID().toString();
+ headers.put(WebClusterManager.REQUEST_ID_HEADER, requestId);
+ for (final Map.Entry<NodeIdentifier, URI> entry : uriMap.entrySet()) {
+ final NodeIdentifier nodeId = entry.getKey();
+ final URI nodeUri = entry.getValue();
+ final NodeHttpRequestCallable callable = (entity == null)
+ ? new NodeHttpRequestCallable(nodeId, method, nodeUri, parameters, headers)
+ : new NodeHttpRequestCallable(nodeId, method, nodeUri, entity, headers);
+ futureNodeHttpRequests.add(new NodeHttpRequestFutureWrapper(nodeId, method, nodeUri, completionService.submit(callable)));
+ }
+
+ // get the node responses
+ final Set<NodeResponse> result = new HashSet<>();
+ for (int i = 0; i < nodeIds.size(); i++) {
+
+ // keeps track of the original request information in case we receive an exception
+ NodeHttpRequestFutureWrapper futureNodeHttpRequest = null;
+ try {
+
+ // get the future resource response for the node
+ final Future<NodeResponse> futureNodeResourceResponse = completionService.take();
+
+ // find the original request by comparing the submitted future with the future returned by the completion service
+ for (final NodeHttpRequestFutureWrapper futureNodeHttpRequestElem : futureNodeHttpRequests) {
+ if (futureNodeHttpRequestElem.getFuture() == futureNodeResourceResponse) {
+ futureNodeHttpRequest = futureNodeHttpRequestElem;
+ }
+ }
+
+ // try to retrieve the node response and add to result
+ final NodeResponse nodeResponse = futureNodeResourceResponse.get();
+ result.add(nodeResponse);
+
+ } catch (final InterruptedException | ExecutionException ex) {
+
+ logger.warn("Node request for " + futureNodeHttpRequest.getNodeId() + " encountered exception: " + ex, ex);
+
+ // create node response with the thrown exception and add to result
+ final NodeResponse nodeResponse = new NodeResponse(
+ futureNodeHttpRequest.getNodeId(), futureNodeHttpRequest.getHttpMethod(), futureNodeHttpRequest.getRequestUri(), ex);
+ result.add(nodeResponse);
+
+ }
+ }
+
+ if (logger.isDebugEnabled()) {
+ NodeResponse min = null;
+ NodeResponse max = null;
+ long nanosSum = 0L;
+ int nanosAdded = 0;
+
+ for (final NodeResponse response : result) {
+ final long requestNanos = response.getRequestDuration(TimeUnit.NANOSECONDS);
+ final long minNanos = (min == null) ? -1 : min.getRequestDuration(TimeUnit.NANOSECONDS);
+ final long maxNanos = (max == null) ? -1 : max.getRequestDuration(TimeUnit.NANOSECONDS);
+
+ if (requestNanos < minNanos || minNanos < 0L) {
+ min = response;
+ }
+
+ if (requestNanos > maxNanos || maxNanos < 0L) {
+ max = response;
+ }
+
+ if (requestNanos >= 0L) {
+ nanosSum += requestNanos;
+ nanosAdded++;
+ }
+ }
+
+ final StringBuilder sb = new StringBuilder();
+ sb.append("Node Responses for ").append(method).append(" ").append(path).append(" (Request ID ").append(requestId).append("):\n");
+ for (final NodeResponse response : result) {
+ sb.append(response).append("\n");
+ }
+
+ final long averageNanos = (nanosAdded == 0) ? -1L : nanosSum / nanosAdded;
+ final long averageMillis = (averageNanos < 0) ? averageNanos : TimeUnit.MILLISECONDS.convert(averageNanos, TimeUnit.NANOSECONDS);
+ logger.debug("For {} {} (Request ID {}), minimum response time = {}, max = {}, average = {} ms",
+ method, path, requestId, min, max, averageMillis);
+ logger.debug(sb.toString());
+ }
+
+ return result;
+ }
+
+ /**
+ * Wraps a future node response with info from originating request. This
+ * coupling allows for futures that encountered exceptions to be linked back
+ * to the failing node and better reported.
+ */
+ private class NodeHttpRequestFutureWrapper {
+
+ private final NodeIdentifier nodeId;
+
+ private final String httpMethod;
+
+ private final URI requestUri;
+
+ private final Future<NodeResponse> future;
+
+ public NodeHttpRequestFutureWrapper(final NodeIdentifier nodeId, final String httpMethod,
+ final URI requestUri, final Future<NodeResponse> future) {
+ if (nodeId == null) {
+ throw new IllegalArgumentException("Node ID may not be null.");
+ } else if (StringUtils.isBlank(httpMethod)) {
+ throw new IllegalArgumentException("Http method may not be null or empty.");
+ } else if (requestUri == null) {
+ throw new IllegalArgumentException("Request URI may not be null.");
+ } else if (future == null) {
+ throw new IllegalArgumentException("Future may not be null.");
+ }
+ this.nodeId = nodeId;
+ this.httpMethod = httpMethod;
+ this.requestUri = requestUri;
+ this.future = future;
+ }
+
+ public NodeIdentifier getNodeId() {
+ return nodeId;
+ }
+
+ public String getHttpMethod() {
+ return httpMethod;
+ }
+
+ public URI getRequestUri() {
+ return requestUri;
+ }
+
+ public Future<NodeResponse> getFuture() {
+ return future;
+ }
+ }
+
+ /**
+ * A Callable for making an HTTP request to a single node and returning its
+ * response.
+ */
+ private class NodeHttpRequestCallable implements Callable<NodeResponse> {
+
+ private final NodeIdentifier nodeId;
+ private final String method;
+ private final URI uri;
+ private final Object entity;
+ private final Map<String, List<String>> parameters = new HashMap<>();
+ private final Map<String, String> headers = new HashMap<>();
+
+ private NodeHttpRequestCallable(final NodeIdentifier nodeId, final String method,
+ final URI uri, final Object entity, final Map<String, String> headers) {
+ this.nodeId = nodeId;
+ this.method = method;
+ this.uri = uri;
+ this.entity = entity;
+ this.headers.putAll(headers);
+ }
+
+ private NodeHttpRequestCallable(final NodeIdentifier nodeId, final String method,
+ final URI uri, final Map<String, List<String>> parameters, final Map<String, String> headers) {
+ this.nodeId = nodeId;
+ this.method = method;
+ this.uri = uri;
+ this.entity = null;
+ this.parameters.putAll(parameters);
+ this.headers.putAll(headers);
+ }
+
+ @Override
+ public NodeResponse call() {
+
+ try {
+ // create and send the request
+ final WebResource.Builder resourceBuilder = getResourceBuilder();
+ final String requestId = headers.get("x-nifi-request-id");
+
+ final long startNanos = System.nanoTime();
+ final ClientResponse clientResponse;
+ if (HttpMethod.DELETE.equalsIgnoreCase(method)) {
+ clientResponse = resourceBuilder.delete(ClientResponse.class);
+ } else if (HttpMethod.GET.equalsIgnoreCase(method)) {
+ clientResponse = resourceBuilder.get(ClientResponse.class);
+ } else if (HttpMethod.HEAD.equalsIgnoreCase(method)) {
+ clientResponse = resourceBuilder.head();
+ } else if (HttpMethod.OPTIONS.equalsIgnoreCase(method)) {
+ clientResponse = resourceBuilder.options(ClientResponse.class);
+ } else if (HttpMethod.POST.equalsIgnoreCase(method)) {
+ clientResponse = resourceBuilder.post(ClientResponse.class);
+ } else if (HttpMethod.PUT.equalsIgnoreCase(method)) {
+ clientResponse = resourceBuilder.put(ClientResponse.class);
+ } else {
+ throw new IllegalArgumentException("HTTP Method '" + method + "' not supported for request replication.");
+ }
+
+ // create and return the response
+ return new NodeResponse(nodeId, method, uri, clientResponse, System.nanoTime() - startNanos, requestId);
+
+ } catch (final UniformInterfaceException | IllegalArgumentException t) {
+ return new NodeResponse(nodeId, method, uri, t);
+ }
+
+ }
+
+ private WebResource.Builder getResourceBuilder() {
+
+ // convert parameters to a more convenient data structure
+ final MultivaluedMap<String, String> map = new MultivaluedMapImpl();
+ map.putAll(parameters);
+
+ // create the resource
+ WebResource resource = client.resource(uri);
+
+ if (WebClusterManager.isResponseInterpreted(uri, method)) {
+ resource.addFilter(new GZIPContentEncodingFilter(false));
+ }
+
+ // set the parameters as either query parameters or as request body
+ final WebResource.Builder builder;
+ if (HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.HEAD.equalsIgnoreCase(method) || HttpMethod.GET.equalsIgnoreCase(method) || HttpMethod.OPTIONS.equalsIgnoreCase(method)) {
+ resource = resource.queryParams(map);
+ builder = resource.getRequestBuilder();
+ } else {
+ if (entity == null) {
+ builder = resource.entity(map);
+ } else {
+ builder = resource.entity(entity);
+ }
+ }
+
+ // set headers
+ boolean foundContentType = false;
+ for (final Map.Entry<String, String> entry : headers.entrySet()) {
+ builder.header(entry.getKey(), entry.getValue());
+ if (entry.getKey().equalsIgnoreCase("content-type")) {
+ foundContentType = true;
+ }
+ }
+
+ // set default content type
+ if (!foundContentType) {
+ // set default content type
+ builder.type(MediaType.APPLICATION_FORM_URLENCODED);
+ }
+
+ return builder;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImpl.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImpl.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImpl.java
new file mode 100644
index 0000000..afade7e
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/HttpResponseMapperImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.impl;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.nifi.cluster.manager.HttpResponseMapper;
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.node.Node;
+import org.apache.nifi.cluster.node.Node.Status;
+import org.apache.nifi.logging.NiFiLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Determines the status of nodes based on their HTTP response codes.
+ *
+ * The algorithm is as follows.
+ *
+ * If any HTTP responses were 2XX, then disconnect non-2XX responses. This is
+ * because 2XX may have changed a node's flow.
+ *
+ * If no 2XX responses were received, then the node's flow has not changed.
+ * Instead of disconnecting everything, we only disconnect the nodes with
+ * internal errors, i.e., 5XX responses.
+ *
+ * @author unattributed
+ */
+public class HttpResponseMapperImpl implements HttpResponseMapper {
+
+ private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(HttpResponseMapperImpl.class));
+
+ @Override
+ public Map<NodeResponse, Status> map(final URI requestURI, final Set<NodeResponse> nodeResponses) {
+
+ final Map<NodeResponse, Status> result = new HashMap<>();
+
+ // check if any responses were 2XX
+ boolean found2xx = false;
+ for (final NodeResponse nodeResponse : nodeResponses) {
+ if (nodeResponse.is2xx()) {
+ found2xx = true;
+ break;
+ }
+ }
+
+ // determine the status of each node
+ for (final NodeResponse nodeResponse : nodeResponses) {
+
+ final Node.Status status;
+ if (found2xx) {
+ // disconnect nodes with non-2XX responses
+ status = nodeResponse.is2xx()
+ ? Node.Status.CONNECTED
+ : Node.Status.DISCONNECTED;
+ } else {
+ // disconnect nodes with 5XX responses or exception
+ status = nodeResponse.is5xx()
+ ? Node.Status.DISCONNECTED
+ : Node.Status.CONNECTED;
+ }
+
+ result.put(nodeResponse, status);
+ }
+
+ return result;
+ }
+
+}