You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/22 18:04:11 UTC
[22/79] [abbrv] [partial] incubator-nifi git commit: NIFI-270 Made
all changes identified by adam, mark, joey to prep for a cleaner build
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/pom.xml b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/pom.xml
deleted file mode 100644
index bad5a29..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/pom.xml
+++ /dev/null
@@ -1,69 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-framework-parent</artifactId>
- <version>0.0.1-incubating-SNAPSHOT</version>
- </parent>
- <artifactId>framework-cluster-protocol</artifactId>
- <packaging>jar</packaging>
- <name>NiFi Framework Cluster Protocol</name>
- <description>The messaging protocol for clustered NiFi</description>
- <dependencies>
-
- <!-- application dependencies -->
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-properties</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-logging-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-socket-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-security</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>core-api</artifactId>
- </dependency>
-
- <!-- spring dependencies -->
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-beans</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
deleted file mode 100644
index fa1547f..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterManagerProtocolSender.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol;
-
-import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
-import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
-import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
-import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
-import org.apache.nifi.reporting.BulletinRepository;
-
-/**
- * An interface for sending protocol messages from the cluster manager to nodes.
- *
- * @author unattributed
- */
-public interface ClusterManagerProtocolSender {
-
- /**
- * Sends a "flow request" message to a node.
- * @param msg a message
- * @return the response
- * @throws ProtocolException if communication failed
- */
- FlowResponseMessage requestFlow(FlowRequestMessage msg) throws ProtocolException;
-
- /**
- * Sends a "reconnection request" message to a node.
- * @param msg a message
- * @return
- * @throws ProtocolException if communication failed
- */
- ReconnectionResponseMessage requestReconnection(ReconnectionRequestMessage msg) throws ProtocolException;
-
- /**
- * Sends a "disconnection request" message to a node.
- * @param msg a message
- * @throws ProtocolException if communication failed
- */
- void disconnect(DisconnectMessage msg) throws ProtocolException;
-
- /**
- * Sends an "assign primary role" message to a node.
- * @param msg a message
- * @throws ProtocolException if communication failed
- */
- void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException;
-
- /**
- * Sets the {@link BulletinRepository} that can be used to report bulletins
- * @param bulletinRepository
- */
- void setBulletinRepository(final BulletinRepository bulletinRepository);
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java
deleted file mode 100644
index 1b5d007..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionRequest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol;
-
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-import org.apache.nifi.cluster.protocol.jaxb.message.ConnectionRequestAdapter;
-
-/**
- * A node's request to connect to the cluster. The request contains a proposed
- * identifier.
- *
- * @author unattributed
- */
-@XmlJavaTypeAdapter(ConnectionRequestAdapter.class)
-public class ConnectionRequest {
-
- private final NodeIdentifier proposedNodeIdentifier;
-
- public ConnectionRequest(final NodeIdentifier proposedNodeIdentifier) {
- if(proposedNodeIdentifier == null) {
- throw new IllegalArgumentException("Proposed node identifier may not be null.");
- }
- this.proposedNodeIdentifier = proposedNodeIdentifier;
- }
-
- public NodeIdentifier getProposedNodeIdentifier() {
- return proposedNodeIdentifier;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
deleted file mode 100644
index 7a5ff2b..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ConnectionResponse.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol;
-
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-
-import org.apache.nifi.cluster.protocol.jaxb.message.ConnectionResponseAdapter;
-
-/**
- * The cluster manager's response to a node's connection request. If the manager
- * has a current copy of the data flow, then it is returned with a node identifier
- * to the node. Otherwise, the manager will provide a "try again in X seconds"
- * response to the node in hopes that a current data flow will be available upon
- * subsequent requests.
- *
- * @author unattributed
- */
-@XmlJavaTypeAdapter(ConnectionResponseAdapter.class)
-public class ConnectionResponse {
-
- private final boolean blockedByFirewall;
- private final int tryLaterSeconds;
- private final NodeIdentifier nodeIdentifier;
- private final StandardDataFlow dataFlow;
- private final boolean primary;
- private final Integer managerRemoteInputPort;
- private final Boolean managerRemoteCommsSecure;
- private final String instanceId;
-
- private volatile String clusterManagerDN;
-
- public ConnectionResponse(final NodeIdentifier nodeIdentifier, final StandardDataFlow dataFlow, final boolean primary,
- final Integer managerRemoteInputPort, final Boolean managerRemoteCommsSecure, final String instanceId) {
- if(nodeIdentifier == null) {
- throw new IllegalArgumentException("Node identifier may not be empty or null.");
- } else if(dataFlow == null) {
- throw new IllegalArgumentException("DataFlow may not be null.");
- }
- this.nodeIdentifier = nodeIdentifier;
- this.dataFlow = dataFlow;
- this.tryLaterSeconds = 0;
- this.blockedByFirewall = false;
- this.primary = primary;
- this.managerRemoteInputPort = managerRemoteInputPort;
- this.managerRemoteCommsSecure = managerRemoteCommsSecure;
- this.instanceId = instanceId;
- }
-
- public ConnectionResponse(final int tryLaterSeconds) {
- if(tryLaterSeconds <= 0) {
- throw new IllegalArgumentException("Try-Later seconds may not be nonnegative: " + tryLaterSeconds);
- }
- this.dataFlow = null;
- this.nodeIdentifier = null;
- this.tryLaterSeconds = tryLaterSeconds;
- this.blockedByFirewall = false;
- this.primary = false;
- this.managerRemoteInputPort = null;
- this.managerRemoteCommsSecure = null;
- this.instanceId = null;
- }
-
- private ConnectionResponse() {
- this.dataFlow = null;
- this.nodeIdentifier = null;
- this.tryLaterSeconds = 0;
- this.blockedByFirewall = true;
- this.primary = false;
- this.managerRemoteInputPort = null;
- this.managerRemoteCommsSecure = null;
- this.instanceId = null;
- }
-
- public static ConnectionResponse createBlockedByFirewallResponse() {
- return new ConnectionResponse();
- }
-
- public boolean isPrimary() {
- return primary;
- }
-
- public boolean shouldTryLater() {
- return tryLaterSeconds > 0;
- }
-
- public boolean isBlockedByFirewall() {
- return blockedByFirewall;
- }
-
- public int getTryLaterSeconds() {
- return tryLaterSeconds;
- }
-
- public StandardDataFlow getDataFlow() {
- return dataFlow;
- }
-
- public NodeIdentifier getNodeIdentifier() {
- return nodeIdentifier;
- }
-
- public Integer getManagerRemoteInputPort() {
- return managerRemoteInputPort;
- }
-
- public Boolean isManagerRemoteCommsSecure() {
- return managerRemoteCommsSecure;
- }
-
- public String getInstanceId() {
- return instanceId;
- }
-
- public void setClusterManagerDN(final String dn) {
- this.clusterManagerDN = dn;
- }
-
- /**
- * Returns the DN of the NCM, if it is available or <code>null</code> otherwise.
- *
- * @return
- */
- public String getClusterManagerDN() {
- return clusterManagerDN;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
deleted file mode 100644
index 67324a1..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/Heartbeat.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol;
-
-import java.util.Date;
-import javax.xml.bind.annotation.XmlTransient;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-import org.apache.nifi.cluster.protocol.jaxb.message.HeartbeatAdapter;
-
-/**
- * A heartbeat for indicating the status of a node to the cluster.
- * @author unattributed
- */
-@XmlJavaTypeAdapter(HeartbeatAdapter.class)
-public class Heartbeat {
-
- private final NodeIdentifier nodeIdentifier;
- private final boolean primary;
- private final boolean connected;
- private final long createdTimestamp;
- private final byte[] payload;
-
- public Heartbeat(final NodeIdentifier nodeIdentifier, final boolean primary, final boolean connected, final byte[] payload) {
- if(nodeIdentifier == null) {
- throw new IllegalArgumentException("Node Identifier may not be null.");
- }
- this.nodeIdentifier = nodeIdentifier;
- this.primary = primary;
- this.connected = connected;
- this.payload = payload;
- this.createdTimestamp = new Date().getTime();
- }
-
- public NodeIdentifier getNodeIdentifier() {
- return nodeIdentifier;
- }
-
- public byte[] getPayload() {
- return payload;
- }
-
- public boolean isPrimary() {
- return primary;
- }
-
- public boolean isConnected() {
- return connected;
- }
-
- @XmlTransient
- public long getCreatedTimestamp() {
- return createdTimestamp;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java
deleted file mode 100644
index a120524..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeBulletins.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol;
-
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-import org.apache.nifi.cluster.protocol.jaxb.message.NodeBulletinsAdapter;
-
-/**
- *
- */
-@XmlJavaTypeAdapter(NodeBulletinsAdapter.class)
-public class NodeBulletins {
-
- private final NodeIdentifier nodeIdentifier;
- private final byte[] payload;
-
- public NodeBulletins(NodeIdentifier nodeIdentifier, byte[] payload) {
- this.nodeIdentifier = nodeIdentifier;
- this.payload = payload;
- }
-
- public NodeIdentifier getNodeIdentifier() {
- return nodeIdentifier;
- }
-
- public byte[] getPayload() {
- return payload;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
deleted file mode 100644
index 1893186..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeIdentifier.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol;
-
-import org.apache.commons.lang3.StringUtils;
-
-/**
- * A node identifier denoting the coordinates of a flow controller that is connected
- * to a cluster. Nodes provide an external public API interface and an internal private
- * interface for communicating with the cluster.
- *
- * The external API interface and internal protocol each require an IP or hostname
- * as well as a port for communicating.
- *
- * This class overrides hashCode and equals and considers two instances to be
- * equal if they have the equal IDs.
- *
- * @author unattributed
- * @Immutable
- * @Threadsafe
- */
-public class NodeIdentifier {
-
- /** the unique identifier for the node */
- private final String id;
-
- /** the IP or hostname to use for sending requests to the node's external interface */
- private final String apiAddress;
-
- /** the port to use use for sending requests to the node's external interface */
- private final int apiPort;
-
- /** the IP or hostname to use for sending requests to the node's internal interface */
- private final String socketAddress;
-
- /** the port to use use for sending requests to the node's internal interface */
- private final int socketPort;
-
- private final String nodeDn;
-
- public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort) {
- this(id, apiAddress, apiPort, socketAddress, socketPort, null);
- }
-
- public NodeIdentifier(final String id, final String apiAddress, final int apiPort, final String socketAddress, final int socketPort, final String dn) {
-
- if(StringUtils.isBlank(id)) {
- throw new IllegalArgumentException("Node ID may not be empty or null.");
- } else if(StringUtils.isBlank(apiAddress)) {
- throw new IllegalArgumentException("Node API address may not be empty or null.");
- } else if(StringUtils.isBlank(socketAddress)) {
- throw new IllegalArgumentException("Node socket address may not be empty or null.");
- }
-
- validatePort(apiPort);
- validatePort(socketPort);
-
- this.id = id;
- this.apiAddress = apiAddress;
- this.apiPort = apiPort;
- this.socketAddress = socketAddress;
- this.socketPort = socketPort;
- this.nodeDn = dn;
- }
-
- public String getId() {
- return id;
- }
-
- public String getDN() {
- return nodeDn;
- }
-
- public String getApiAddress() {
- return apiAddress;
- }
-
- public int getApiPort() {
- return apiPort;
- }
-
- public String getSocketAddress() {
- return socketAddress;
- }
-
- public int getSocketPort() {
- return socketPort;
- }
-
- private void validatePort(final int port) {
- if(port < 1 || port > 65535) {
- throw new IllegalArgumentException("Port must be inclusively in the range [1, 65535]. Port given: " + port);
- }
- }
-
- /**
- * Compares the id of two node identifiers for equality.
- *
- * @param obj a node identifier
- *
- * @return true if the id is equal; false otherwise
- */
- @Override
- public boolean equals(Object obj) {
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- final NodeIdentifier other = (NodeIdentifier) obj;
- if ((this.id == null) ? (other.id != null) : !this.id.equals(other.id)) {
- return false;
- }
- return true;
- }
-
- /**
- * Compares API address/port and socket address/port for equality. The
- * id is not used for comparison.
- *
- * @param other a node identifier
- *
- * @return true if API address/port and socket address/port are equal; false
- * otherwise
- */
- public boolean logicallyEquals(final NodeIdentifier other) {
- if(other == null) {
- return false;
- }
- if ((this.apiAddress == null) ? (other.apiAddress != null) : !this.apiAddress.equals(other.apiAddress)) {
- return false;
- }
- if(this.apiPort != other.apiPort) {
- return false;
- }
- if ((this.socketAddress == null) ? (other.socketAddress != null) : !this.socketAddress.equals(other.socketAddress)) {
- return false;
- }
- if(this.socketPort != other.socketPort) {
- return false;
- }
- return true;
- }
-
- @Override
- public int hashCode() {
- int hash = 7;
- hash = 31 * hash + (this.id != null ? this.id.hashCode() : 0);
- return hash;
- }
-
- @Override
- public String toString() {
- return "[" + "id=" + id + ", apiAddress=" + apiAddress + ", apiPort=" + apiPort + ", socketAddress=" + socketAddress + ", socketPort=" + socketPort + ']';
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
deleted file mode 100644
index 1edcb91..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/NodeProtocolSender.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol;
-
-import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
-import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
-import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
-import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
-
-/**
- * An interface for sending protocol messages from a node to the cluster manager.
- * @author unattributed
- */
-public interface NodeProtocolSender {
-
- /**
- * Sends a "connection request" message to the cluster manager.
- * @param msg a message
- * @return the response
- * @throws UnknownServiceAddressException if the cluster manager's address is not known
- * @throws ProtocolException if communication failed
- */
- ConnectionResponseMessage requestConnection(ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException;
-
- /**
- * Sends a "heartbeat" message to the cluster manager.
- * @param msg a message
- * @throws UnknownServiceAddressException if the cluster manager's address is not known
- * @throws ProtocolException if communication failed
- */
- void heartbeat(HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException;
-
- /**
- * Sends a bulletins message to the cluster manager.
- * @param msg
- * @throws ProtocolException
- * @throws UnknownServiceAddressException
- */
- void sendBulletins(NodeBulletinsMessage msg) throws ProtocolException, UnknownServiceAddressException;
-
- /**
- * Sends a failure notification if the controller was unable start.
- * @param msg a message
- * @throws UnknownServiceAddressException if the cluster manager's address is not known
- * @throws ProtocolException if communication failed
- */
- void notifyControllerStartupFailure(ControllerStartupFailureMessage msg) throws ProtocolException, UnknownServiceAddressException;
-
- /**
- * Sends a failure notification if the node was unable to reconnect to the cluster
- * @param msg a message
- * @throws UnknownServiceAddressException if the cluster manager's address is not known
- * @throws ProtocolException if communication failed
- */
- void notifyReconnectionFailure(ReconnectionFailureMessage msg) throws ProtocolException, UnknownServiceAddressException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java
deleted file mode 100644
index b614e76..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolContext.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol;
-
-/**
- * The context for communicating using the internal cluster protocol.
- *
- * @param <T> The type of protocol message.
- *
- * @author unattributed
- */
-public interface ProtocolContext<T> {
-
- /**
- * Creates a marshaller for serializing protocol messages.
- * @return a marshaller
- */
- ProtocolMessageMarshaller<T> createMarshaller();
-
- /**
- * Creates an unmarshaller for deserializing protocol messages.
- * @return a unmarshaller
- */
- ProtocolMessageUnmarshaller<T> createUnmarshaller();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java
deleted file mode 100644
index f11ad84..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol;
-
-/**
- * The base exception for problems encountered while communicating within the
- * cluster.
- * @author unattributed
- */
-public class ProtocolException extends RuntimeException {
-
- public ProtocolException() {
- }
-
- public ProtocolException(String msg) {
- super(msg);
- }
-
- public ProtocolException(Throwable cause) {
- super(cause);
- }
-
- public ProtocolException(String msg, Throwable cause) {
- super(msg, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java
deleted file mode 100644
index 6de87db..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolHandler.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol;
-
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-
-/**
- * A handler for processing protocol messages.
- * @author unattributed
- */
-public interface ProtocolHandler {
-
- /**
- * Handles the given protocol message or throws an exception if it cannot
- * handle the message. If no response is needed by the protocol, then null
- * should be returned.
- *
- * @param msg a message
- * @return a response or null, if no response is necessary
- *
- * @throws ProtocolException if the message could not be processed
- */
- ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException;
-
- /**
- * @param msg
- * @return true if the handler can process the given message; false otherwise
- */
- boolean canHandle(ProtocolMessage msg);
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java
deleted file mode 100644
index 32f0f5d..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolListener.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.nifi.reporting.BulletinRepository;
-
-/**
- * Defines the interface for a listener to process protocol messages.
- * @author unattributed
- */
-public interface ProtocolListener {
-
- /**
- * Starts the instance for listening for messages. Start may only be called
- * if the instance is not running.
- * @throws java.io.IOException
- */
- void start() throws IOException;
-
- /**
- * Stops the instance from listening for messages. Stop may only be called
- * if the instance is running.
- * @throws java.io.IOException
- */
- void stop() throws IOException;
-
- /**
- * @return true if the instance is started; false otherwise.
- */
- boolean isRunning();
-
- /**
- * @return the handlers registered with the listener
- */
- Collection<ProtocolHandler> getHandlers();
-
- /**
- * Registers a handler with the listener.
- * @param handler a handler
- */
- void addHandler(ProtocolHandler handler);
-
- /**
- * Sets the BulletinRepository that can be used to report bulletins
- * @param bulletinRepository
- */
- void setBulletinRepository(BulletinRepository bulletinRepository);
-
- /**
- * Unregisters the handler with the listener.
- * @param handler a handler
- * @return true if the handler was removed; false otherwise
- */
- boolean removeHandler(ProtocolHandler handler);
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java
deleted file mode 100644
index bb436e0..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageMarshaller.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * Defines a marshaller for serializing protocol messages.
- *
- * @param <T> The type of protocol message.
- *
- * @author unattributed
- */
-public interface ProtocolMessageMarshaller<T> {
-
- /**
- * Serializes the given message to the given output stream.
- * @param msg a message
- * @param os an output stream
- * @throws IOException if the message could not be serialized to the stream
- */
- void marshal(T msg, OutputStream os) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java
deleted file mode 100644
index c690e7b..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ProtocolMessageUnmarshaller.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Defines an unmarshaller for deserializing protocol messages.
- *
- * @param <T> The type of protocol message.
- *
- * @author unattributed
- */
-public interface ProtocolMessageUnmarshaller<T> {
-
- /**
- * Deserializes a message on the given input stream.
- * @param is an input stream
- * @return
- * @throws IOException if the message could not be deserialized from the stream
- */
- T unmarshal(InputStream is) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
deleted file mode 100644
index c2d16fc..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/StandardDataFlow.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol;
-
-import org.apache.nifi.cluster.protocol.DataFlow;
-import java.io.Serializable;
-import java.util.Arrays;
-
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-
-import org.apache.nifi.cluster.protocol.jaxb.message.DataFlowAdapter;
-
-/**
- * Represents a dataflow, which includes the raw bytes of the flow.xml and
- * whether processors should be started automatically at application startup.
- */
-@XmlJavaTypeAdapter(DataFlowAdapter.class)
-public class StandardDataFlow implements Serializable, DataFlow {
-
- private final byte[] flow;
- private final byte[] templateBytes;
- private final byte[] snippetBytes;
-
- private boolean autoStartProcessors;
-
- /**
- * Constructs an instance.
- *
- * @param flow a valid flow as bytes, which cannot be null
- * @param templateBytes an XML representation of templates
- * @param snippetBytes an XML representation of snippets
- *
- * @throws NullPointerException if any argument is null
- */
- public StandardDataFlow(final byte[] flow, final byte[] templateBytes, final byte[] snippetBytes) {
- this.flow = flow;
- this.templateBytes = templateBytes;
- this.snippetBytes = snippetBytes;
- }
-
- public StandardDataFlow(final DataFlow toCopy) {
- this.flow = copy(toCopy.getFlow());
- this.templateBytes = copy(toCopy.getTemplates());
- this.snippetBytes = copy(toCopy.getSnippets());
- this.autoStartProcessors = toCopy.isAutoStartProcessors();
- }
-
- private static byte[] copy(final byte[] bytes) {
- return bytes == null ? null : Arrays.copyOf(bytes, bytes.length);
- }
-
- /**
- * @return the raw byte array of the flow
- */
- public byte[] getFlow() {
- return flow;
- }
-
- /**
- * @return the raw byte array of the templates
- */
- public byte[] getTemplates() {
- return templateBytes;
- }
-
- /**
- * @return the raw byte array of the snippets
- */
- public byte[] getSnippets() {
- return snippetBytes;
- }
-
- /**
- * @return true if processors should be automatically started at application
- * startup; false otherwise
- */
- public boolean isAutoStartProcessors() {
- return autoStartProcessors;
- }
-
- /**
- *
- * Sets the flag to automatically start processors at application startup.
- *
- * @param autoStartProcessors true if processors should be automatically
- * started at application startup; false otherwise
- */
- public void setAutoStartProcessors(final boolean autoStartProcessors) {
- this.autoStartProcessors = autoStartProcessors;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java
deleted file mode 100644
index 41c74eb..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/UnknownServiceAddressException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol;
-
-/**
- * Represents the exceptional case when a service's address is not known.
- * @author unattributed
- */
-public class UnknownServiceAddressException extends RuntimeException {
-
- public UnknownServiceAddressException() {
- }
-
- public UnknownServiceAddressException(String msg) {
- super(msg);
- }
-
- public UnknownServiceAddressException(Throwable cause) {
- super(cause);
- }
-
- public UnknownServiceAddressException(String msg, Throwable cause) {
- super(msg, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
deleted file mode 100644
index ceb3fcb..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImpl.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
-import org.apache.nifi.cluster.protocol.NodeIdentifier;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
-import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
-import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
-import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
-import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
-import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
-import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
-import org.apache.nifi.io.socket.SocketConfiguration;
-import org.apache.nifi.io.socket.SocketUtils;
-import org.apache.nifi.reporting.BulletinRepository;
-import org.apache.nifi.util.FormatUtils;
-
-/**
- * A protocol sender for sending protocol messages from the cluster manager to
- * nodes.
- *
- * Connection-type requests (e.g., reconnection, disconnection) by nature of
- * starting/stopping flow controllers take longer than other types of protocol
- * messages. Therefore, a handshake timeout may be specified to lengthen the
- * allowable time for communication with the node.
- *
- * @author unattributed
- */
-public class ClusterManagerProtocolSenderImpl implements ClusterManagerProtocolSender {
-
-
- private final ProtocolContext<ProtocolMessage> protocolContext;
- private final SocketConfiguration socketConfiguration;
- private int handshakeTimeoutSeconds;
- private volatile BulletinRepository bulletinRepository;
-
- public ClusterManagerProtocolSenderImpl(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
- if(socketConfiguration == null) {
- throw new IllegalArgumentException("Socket configuration may not be null.");
- } else if(protocolContext == null) {
- throw new IllegalArgumentException("Protocol Context may not be null.");
- }
- this.socketConfiguration = socketConfiguration;
- this.protocolContext = protocolContext;
- this.handshakeTimeoutSeconds = -1; // less than zero denotes variable not configured
- }
-
- @Override
- public void setBulletinRepository(final BulletinRepository bulletinRepository) {
- this.bulletinRepository = bulletinRepository;
- }
-
- /**
- * Requests the data flow from a node.
- * @param msg a message
- * @return the message response
- * @throws @throws ProtocolException if the message failed to be sent or the response was malformed
- */
- @Override
- public FlowResponseMessage requestFlow(final FlowRequestMessage msg) throws ProtocolException {
- Socket socket = null;
- try {
- socket = createSocket(msg.getNodeId(), false);
-
- try {
- // marshal message to output stream
- final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
- marshaller.marshal(msg, socket.getOutputStream());
- } catch(final IOException ioe) {
- throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
- }
-
- final ProtocolMessage response;
- try {
- // unmarshall response and return
- final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
- response = unmarshaller.unmarshal(socket.getInputStream());
- } catch(final IOException ioe) {
- throw new ProtocolException("Failed unmarshalling '" + MessageType.FLOW_RESPONSE + "' protocol message due to: " + ioe, ioe);
- }
-
- if(MessageType.FLOW_RESPONSE == response.getType()) {
- return (FlowResponseMessage) response;
- } else {
- throw new ProtocolException("Expected message type '" + MessageType.FLOW_RESPONSE + "' but found '" + response.getType() + "'");
- }
-
- } finally {
- SocketUtils.closeQuietly(socket);
- }
- }
-
- /**
- * Requests a node to reconnect to the cluster. The configured value for
- * handshake timeout is applied to the socket before making the request.
- * @param msg a message
- * @return the response
- * @throws ProtocolException if the message failed to be sent or the response was malformed
- */
- @Override
- public ReconnectionResponseMessage requestReconnection(final ReconnectionRequestMessage msg) throws ProtocolException {
- Socket socket = null;
- try {
- socket = createSocket(msg.getNodeId(), true);
-
- // marshal message to output stream
- try {
- final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
- marshaller.marshal(msg, socket.getOutputStream());
- } catch(final IOException ioe) {
- throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
- }
-
-
- final ProtocolMessage response;
- try {
- // unmarshall response and return
- final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
- response = unmarshaller.unmarshal(socket.getInputStream());
- } catch(final IOException ioe) {
- throw new ProtocolException("Failed unmarshalling '" + MessageType.RECONNECTION_RESPONSE + "' protocol message due to: " + ioe, ioe);
- }
-
- if(MessageType.RECONNECTION_RESPONSE == response.getType()) {
- return (ReconnectionResponseMessage) response;
- } else {
- throw new ProtocolException("Expected message type '" + MessageType.FLOW_RESPONSE + "' but found '" + response.getType() + "'");
- }
- } finally {
- SocketUtils.closeQuietly(socket);
- }
- }
-
- /**
- * Requests a node to disconnect from the cluster. The configured value for
- * handshake timeout is applied to the socket before making the request.
- * @param msg a message
- * @throws ProtocolException if the message failed to be sent
- */
- @Override
- public void disconnect(final DisconnectMessage msg) throws ProtocolException {
- Socket socket = null;
- try {
- socket = createSocket(msg.getNodeId(), true);
-
- // marshal message to output stream
- try {
- final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
- marshaller.marshal(msg, socket.getOutputStream());
- } catch(final IOException ioe) {
- throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
- }
- } finally {
- SocketUtils.closeQuietly(socket);
- }
- }
-
- /**
- * Assigns the primary role to a node.
- *
- * @param msg a message
- *
- * @throws ProtocolException if the message failed to be sent
- */
- @Override
- public void assignPrimaryRole(final PrimaryRoleAssignmentMessage msg) throws ProtocolException {
- Socket socket = null;
- try {
- socket = createSocket(msg.getNodeId(), true);
-
- try {
- // marshal message to output stream
- final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
- marshaller.marshal(msg, socket.getOutputStream());
- } catch(final IOException ioe) {
- throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
- }
- } finally {
- SocketUtils.closeQuietly(socket);
- }
- }
-
-
- private void setConnectionHandshakeTimeoutOnSocket(final Socket socket) throws SocketException {
- // update socket timeout, if handshake timeout was set; otherwise use socket's current timeout
- if(handshakeTimeoutSeconds >= 0) {
- socket.setSoTimeout(handshakeTimeoutSeconds * 1000);
- }
- }
-
- public SocketConfiguration getSocketConfiguration() {
- return socketConfiguration;
- }
-
- public int getHandshakeTimeoutSeconds() {
- return handshakeTimeoutSeconds;
- }
-
- public void setHandshakeTimeout(final String handshakeTimeout) {
- this.handshakeTimeoutSeconds = (int) FormatUtils.getTimeDuration(handshakeTimeout, TimeUnit.SECONDS);
- }
-
- private Socket createSocket(final NodeIdentifier nodeId, final boolean applyHandshakeTimeout) {
- return createSocket(nodeId.getSocketAddress(), nodeId.getSocketPort(), applyHandshakeTimeout);
- }
-
- private Socket createSocket(final String host, final int port, final boolean applyHandshakeTimeout) {
- try {
- // create a socket
- final Socket socket = SocketUtils.createSocket(InetSocketAddress.createUnresolved(host, port), socketConfiguration);
- if ( applyHandshakeTimeout ) {
- setConnectionHandshakeTimeoutOnSocket(socket);
- }
- return socket;
- } catch(final IOException ioe) {
- throw new ProtocolException("Failed to create socket due to: " + ioe, ioe);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
deleted file mode 100644
index 933e5fa..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderListener.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.ProtocolListener;
-import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
-import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
-import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
-import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
-import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
-import org.apache.nifi.reporting.BulletinRepository;
-
-/**
- * A wrapper class for consolidating a protocol sender and listener for the cluster
- * manager.
- *
- * @author unattributed
- */
-public class ClusterManagerProtocolSenderListener implements ClusterManagerProtocolSender, ProtocolListener {
-
- private final ClusterManagerProtocolSender sender;
-
- private final ProtocolListener listener;
-
- public ClusterManagerProtocolSenderListener(final ClusterManagerProtocolSender sender, final ProtocolListener listener) {
- if(sender == null) {
- throw new IllegalArgumentException("ClusterManagerProtocolSender may not be null.");
- } else if(listener == null) {
- throw new IllegalArgumentException("ProtocolListener may not be null.");
- }
- this.sender = sender;
- this.listener = listener;
- }
-
- @Override
- public void stop() throws IOException {
- if(!isRunning()) {
- throw new IllegalStateException("Instance is already stopped.");
- }
- listener.stop();
- }
-
- @Override
- public void start() throws IOException {
- if(isRunning()) {
- throw new IllegalStateException("Instance is already started.");
- }
- listener.start();
- }
-
- @Override
- public boolean isRunning() {
- return listener.isRunning();
- }
-
- @Override
- public boolean removeHandler(final ProtocolHandler handler) {
- return listener.removeHandler(handler);
- }
-
- @Override
- public Collection<ProtocolHandler> getHandlers() {
- return listener.getHandlers();
- }
-
- @Override
- public void addHandler(final ProtocolHandler handler) {
- listener.addHandler(handler);
- }
-
- @Override
- public void setBulletinRepository(final BulletinRepository bulletinRepository) {
- listener.setBulletinRepository(bulletinRepository);
- sender.setBulletinRepository(bulletinRepository);
- }
-
- @Override
- public FlowResponseMessage requestFlow(final FlowRequestMessage msg) throws ProtocolException {
- return sender.requestFlow(msg);
- }
-
- @Override
- public ReconnectionResponseMessage requestReconnection(final ReconnectionRequestMessage msg) throws ProtocolException {
- return sender.requestReconnection(msg);
- }
-
- @Override
- public void disconnect(DisconnectMessage msg) throws ProtocolException {
- sender.disconnect(msg);
- }
-
- @Override
- public void assignPrimaryRole(PrimaryRoleAssignmentMessage msg) throws ProtocolException {
- sender.assignPrimaryRole(msg);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java
deleted file mode 100644
index 24e51e0..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscovery.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.Collections;
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
-import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
-import org.apache.nifi.io.socket.multicast.MulticastServiceDiscovery;
-import org.apache.nifi.reporting.BulletinRepository;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.cluster.protocol.ProtocolHandler;
-import org.apache.nifi.cluster.protocol.ProtocolListener;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
-import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
-import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An implementation for discovering services by way of "service broadcast" type
- * protocol messages over multicast.
- *
- * The client caller is responsible for starting and stopping the service
- * discovery. The instance must be stopped before termination of the JVM to
- * ensure proper resource clean-up.
- *
- * @author unattributed
- */
-public class ClusterServiceDiscovery implements MulticastServiceDiscovery, ProtocolListener {
-
- private static final Logger logger = LoggerFactory.getLogger(ClusterServiceDiscovery.class);
- private final String serviceName;
- private final MulticastConfiguration multicastConfiguration;
- private final MulticastProtocolListener listener;
- private volatile BulletinRepository bulletinRepository;
-
- /*
- * guarded by this
- */
- private DiscoverableService service;
-
-
- public ClusterServiceDiscovery(final String serviceName, final InetSocketAddress multicastAddress,
- final MulticastConfiguration multicastConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
-
- if (StringUtils.isBlank(serviceName)) {
- throw new IllegalArgumentException("Service name may not be null or empty.");
- } else if (multicastAddress == null) {
- throw new IllegalArgumentException("Multicast address may not be null.");
- } else if (multicastAddress.getAddress().isMulticastAddress() == false) {
- throw new IllegalArgumentException("Multicast group must be a Class D address.");
- } else if (protocolContext == null) {
- throw new IllegalArgumentException("Protocol Context may not be null.");
- } else if (multicastConfiguration == null) {
- throw new IllegalArgumentException("Multicast configuration may not be null.");
- }
-
- this.serviceName = serviceName;
- this.multicastConfiguration = multicastConfiguration;
- this.listener = new MulticastProtocolListener(1, multicastAddress, multicastConfiguration, protocolContext);
- listener.addHandler(new ClusterManagerServiceBroadcastHandler());
- }
-
- @Override
- public void setBulletinRepository(final BulletinRepository bulletinRepository) {
- this.bulletinRepository = bulletinRepository;
- }
-
- @Override
- public synchronized DiscoverableService getService() {
- return service;
- }
-
- @Override
- public InetSocketAddress getMulticastAddress() {
- return listener.getMulticastAddress();
- }
-
- @Override
- public Collection<ProtocolHandler> getHandlers() {
- return Collections.unmodifiableCollection(listener.getHandlers());
- }
-
- @Override
- public void addHandler(ProtocolHandler handler) {
- listener.addHandler(handler);
- }
-
- @Override
- public boolean removeHandler(ProtocolHandler handler) {
- return listener.removeHandler(handler);
- }
-
- @Override
- public boolean isRunning() {
- return listener.isRunning();
- }
-
- @Override
- public void start() throws IOException {
- if (isRunning()) {
- throw new IllegalStateException("Instance is already running.");
- }
- listener.start();
- }
-
- @Override
- public void stop() throws IOException {
- if (isRunning() == false) {
- throw new IllegalStateException("Instance is already stopped.");
- }
- listener.stop();
- }
-
- public String getServiceName() {
- return serviceName;
- }
-
- public MulticastConfiguration getMulticastConfiguration() {
- return multicastConfiguration;
- }
-
- private class ClusterManagerServiceBroadcastHandler implements ProtocolHandler {
-
- @Override
- public boolean canHandle(final ProtocolMessage msg) {
- return MessageType.SERVICE_BROADCAST == msg.getType();
- }
-
- @Override
- public ProtocolMessage handle(final ProtocolMessage msg) throws ProtocolException {
- synchronized (ClusterServiceDiscovery.this) {
- if (canHandle(msg) == false) {
- throw new ProtocolException("Handler cannot handle message type: " + msg.getType());
- } else {
- final ServiceBroadcastMessage broadcastMsg = (ServiceBroadcastMessage) msg;
- if (serviceName.equals(broadcastMsg.getServiceName())) {
- final DiscoverableService oldService = service;
- if (oldService == null
- || broadcastMsg.getAddress().equalsIgnoreCase(oldService.getServiceAddress().getHostName()) == false
- || broadcastMsg.getPort() != oldService.getServiceAddress().getPort()) {
- service = new DiscoverableServiceImpl(serviceName, InetSocketAddress.createUnresolved(broadcastMsg.getAddress(), broadcastMsg.getPort()));
- final InetSocketAddress oldServiceAddress = (oldService == null) ? null : oldService.getServiceAddress();
- logger.info(String.format("Updating cluster service address for '%s' from '%s' to '%s'", serviceName, prettyPrint(oldServiceAddress), prettyPrint(service.getServiceAddress())));
- }
- }
- return null;
- }
- }
- }
- }
-
- private String prettyPrint(final InetSocketAddress address) {
- if (address == null) {
- return "0.0.0.0:0";
- } else {
- return address.getHostName() + ":" + address.getPort();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java b/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java
deleted file mode 100644
index bebfde8..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocator.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster.protocol.impl;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.nifi.io.socket.multicast.DiscoverableService;
-import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
-import org.apache.nifi.io.socket.multicast.ServiceDiscovery;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements the ServiceLocator interface for locating the socket address
- * of a cluster service. Depending on configuration, the address may be located
- * using service discovery. If using service discovery, then the service methods
- * must be used for starting and stopping discovery.
- *
- * Service discovery may be used in conjunction with a fixed port. In this case,
- * the service discovery will yield the service IP/host while the fixed port will
- * be used for the port.
- *
- * Alternatively, the instance may be configured with exact service location, in
- * which case, no service discovery occurs and the caller will always receive the
- * configured service.
- *
- * @author unattributed
- */
-public class ClusterServiceLocator implements ServiceDiscovery {
-
- private static final Logger logger = LoggerFactory.getLogger(ClusterServiceLocator.class);
-
- private final String serviceName;
-
- private final ClusterServiceDiscovery serviceDiscovery;
-
- private final DiscoverableService fixedService;
-
- private final int fixedServicePort;
-
- private final AttemptsConfig attemptsConfig = new AttemptsConfig();
-
- private final AtomicBoolean running = new AtomicBoolean(false);
-
- public ClusterServiceLocator(final ClusterServiceDiscovery serviceDiscovery) {
- if(serviceDiscovery == null) {
- throw new IllegalArgumentException("ClusterServiceDiscovery may not be null.");
- }
- this.serviceDiscovery = serviceDiscovery;
- this.fixedService = null;
- this.fixedServicePort = 0;
- this.serviceName = serviceDiscovery.getServiceName();
- }
-
- public ClusterServiceLocator(final ClusterServiceDiscovery serviceDiscovery, final int fixedServicePort) {
- if(serviceDiscovery == null) {
- throw new IllegalArgumentException("ClusterServiceDiscovery may not be null.");
- }
- this.serviceDiscovery = serviceDiscovery;
- this.fixedService = null;
- this.fixedServicePort = fixedServicePort;
- this.serviceName = serviceDiscovery.getServiceName();
- }
-
- public ClusterServiceLocator(final DiscoverableService fixedService) {
- if(fixedService == null) {
- throw new IllegalArgumentException("Service may not be null.");
- }
- this.serviceDiscovery = null;
- this.fixedService = fixedService;
- this.fixedServicePort = 0;
- this.serviceName = fixedService.getServiceName();
- }
-
- @Override
- public DiscoverableService getService() {
-
- final int numAttemptsValue;
- final int secondsBetweenAttempts;
- synchronized(this) {
- numAttemptsValue = attemptsConfig.numAttempts;
- secondsBetweenAttempts = attemptsConfig.getTimeBetweenAttempts();
- }
-
- // try for a configured amount of attempts to retrieve the service address
- for(int i = 0; i < numAttemptsValue; i++) {
-
- if(fixedService != null) {
- return fixedService;
- } else if(serviceDiscovery != null) {
-
- final DiscoverableService discoveredService = serviceDiscovery.getService();
-
- // if we received an address
- if(discoveredService != null) {
- // if we were configured with a fixed port, then use the discovered host and fixed port; otherwise use the discovered address
- if(fixedServicePort > 0) {
- // create service using discovered service name and address with fixed service port
- final InetSocketAddress addr = InetSocketAddress.createUnresolved(discoveredService.getServiceAddress().getHostName(), fixedServicePort);
- final DiscoverableService result = new DiscoverableServiceImpl(discoveredService.getServiceName(), addr);
- return result;
- } else {
- return discoveredService;
- }
- }
- }
-
- // could not obtain service address, so sleep a bit
- try {
- logger.debug(String.format("Locating Cluster Service '%s' Attempt: %d of %d failed. Trying again in %d seconds.",
- serviceName, (i + 1), numAttemptsValue, secondsBetweenAttempts));
- Thread.sleep(secondsBetweenAttempts * 1000);
- } catch(final InterruptedException ie) {
- break;
- }
-
- }
-
- return null;
- }
-
- public boolean isRunning() {
- if(serviceDiscovery != null) {
- return serviceDiscovery.isRunning();
- } else {
- return running.get();
- }
- }
-
- public void start() throws IOException {
-
- if(isRunning()) {
- throw new IllegalStateException("Instance is already started.");
- }
-
- if(serviceDiscovery != null) {
- serviceDiscovery.start();
- }
- running.set(true);
- }
-
- public void stop() throws IOException {
-
- if(isRunning() == false) {
- throw new IllegalStateException("Instance is already stopped.");
- }
-
- if(serviceDiscovery != null) {
- serviceDiscovery.stop();
- }
- running.set(false);
- }
-
- public synchronized void setAttemptsConfig(final AttemptsConfig config) {
- if(config == null) {
- throw new IllegalArgumentException("Attempts configuration may not be null.");
- }
- this.attemptsConfig.numAttempts = config.numAttempts;
- this.attemptsConfig.timeBetweenAttempts = config.timeBetweenAttempts;
- this.attemptsConfig.timeBetweenAttempsUnit = config.timeBetweenAttempsUnit;
- }
-
- public synchronized AttemptsConfig getAttemptsConfig() {
- final AttemptsConfig config = new AttemptsConfig();
- config.numAttempts = this.attemptsConfig.numAttempts;
- config.timeBetweenAttempts = this.attemptsConfig.timeBetweenAttempts;
- config.timeBetweenAttempsUnit = this.attemptsConfig.timeBetweenAttempsUnit;
- return config;
- }
-
- public static class AttemptsConfig {
-
- private int numAttempts = 1;
-
- private int timeBetweenAttempts = 1;
-
- private TimeUnit timeBetweenAttempsUnit = TimeUnit.SECONDS;
-
- public int getNumAttempts() {
- return numAttempts;
- }
-
- public void setNumAttempts(int numAttempts) {
- if(numAttempts <= 0) {
- throw new IllegalArgumentException("Number of attempts must be positive: " + numAttempts);
- }
- this.numAttempts = numAttempts;
- }
-
- public TimeUnit getTimeBetweenAttemptsUnit() {
- return timeBetweenAttempsUnit;
- }
-
- public void setTimeBetweenAttempsUnit(TimeUnit timeBetweenAttempsUnit) {
- if(timeBetweenAttempts <= 0) {
- throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts);
- }
- this.timeBetweenAttempsUnit = timeBetweenAttempsUnit;
- }
-
- public int getTimeBetweenAttempts() {
- return timeBetweenAttempts;
- }
-
- public void setTimeBetweenAttempts(int timeBetweenAttempts) {
- if(timeBetweenAttempts <= 0) {
- throw new IllegalArgumentException("Time between attempts must be positive: " + numAttempts);
- }
- this.timeBetweenAttempts = timeBetweenAttempts;
- }
-
- }
-}