You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/06 14:20:21 UTC
[3/6] incubator-nifi git commit: NIFI-282: Bug fixes;
documentation improvements; removed code marked as 'FOR TESTING'
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java
new file mode 100644
index 0000000..2e93599
--- /dev/null
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.EOFException;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An InputStream that will throw EOFException if the underlying InputStream runs out of data before reaching the
+ * configured minimum amount of data
+ */
+public class MinimumLengthInputStream extends FilterInputStream {
+
+ private final long minLength;
+ private long consumedCount = 0L;
+
+ public MinimumLengthInputStream(final InputStream in, final long minLength) {
+ super(in);
+ this.minLength = minLength;
+ }
+
+
+ @Override
+ public int read() throws IOException {
+ final int b = super.read();
+ if ( b < 0 && consumedCount < minLength ) {
+ throw new EOFException();
+ }
+
+ if ( b >= 0 ) {
+ consumedCount++;
+ }
+
+ return b;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException {
+ final int num = super.read(b, off, len);
+
+ if ( num < 0 && consumedCount < minLength ) {
+ throw new EOFException();
+ }
+
+ if ( num >= 0 ) {
+ consumedCount += num;
+ }
+
+ return num;
+ }
+
+ @Override
+ public long skip(final long n) throws IOException {
+ long skipped = super.skip(n);
+ if ( skipped < 1 ) {
+ final int b = super.read();
+ if ( b >= 0 ) {
+ skipped = 1;
+ }
+ }
+
+ if ( skipped < 0 && consumedCount < minLength ) {
+ throw new EOFException();
+ }
+
+ if ( skipped >= 0 ) {
+ consumedCount += skipped;
+ }
+
+ return skipped;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/pom.xml b/nifi/nifi-commons/pom.xml
index ec0bb62..768dfd9 100644
--- a/nifi/nifi-commons/pom.xml
+++ b/nifi/nifi-commons/pom.xml
@@ -35,5 +35,6 @@
<module>nifi-web-utils</module>
<module>nifi-processor-utilities</module>
<module>nifi-write-ahead-log</module>
+ <module>nifi-site-to-site-client</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/pom.xml
index 4a75a34..3d9f3ac 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/pom.xml
@@ -74,6 +74,10 @@
<artifactId>nifi-site-to-site</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-site-to-site-client</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
index 0fcac8c..3a1dfb2 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java
@@ -16,15 +16,17 @@
*/
package org.apache.nifi.cluster.manager;
-import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException;
-import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
-import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
-import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.nifi.cluster.event.Event;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
+import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException;
+import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException;
import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException;
-import org.apache.nifi.cluster.NodeInformant;
-import org.apache.nifi.cluster.event.Event;
+import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException;
+import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.node.Node;
import org.apache.nifi.cluster.node.Node.Status;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
@@ -33,11 +35,9 @@ import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.diagnostics.SystemDiagnostics;
+import org.apache.nifi.remote.cluster.NodeInformant;
import org.apache.nifi.reporting.BulletinRepository;
-import java.util.List;
-import java.util.Set;
-
/**
* Defines the interface for a ClusterManager. The cluster manager is a
* threadsafe centralized manager for a cluster. Members of a cluster are nodes.
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 656759a..4d5455f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -63,11 +63,10 @@ import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
import javax.xml.validation.Validator;
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.cluster.BulletinsPayload;
-import org.apache.nifi.cluster.ClusterNodeInformation;
import org.apache.nifi.cluster.HeartbeatPayload;
-import org.apache.nifi.cluster.NodeInformation;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextImpl;
import org.apache.nifi.cluster.event.Event;
@@ -117,6 +116,7 @@ import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
+import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
@@ -155,6 +155,8 @@ import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.remote.RemoteResourceManager;
import org.apache.nifi.remote.RemoteSiteListener;
import org.apache.nifi.remote.SocketRemoteSiteListener;
+import org.apache.nifi.remote.cluster.ClusterNodeInformation;
+import org.apache.nifi.remote.cluster.NodeInformation;
import org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
@@ -191,7 +193,6 @@ import org.apache.nifi.web.api.entity.ProvenanceEventEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
import org.apache.nifi.web.util.WebUtils;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.DOMException;
@@ -203,8 +204,6 @@ import org.xml.sax.SAXParseException;
import com.sun.jersey.api.client.ClientResponse;
-import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
-
/**
* Provides a cluster manager implementation. The manager federates incoming
* HTTP client requests to the nodes' external API using the HTTP protocol. The
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml
index d088c5c..4b4536e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml
@@ -44,6 +44,10 @@
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-site-to-site-client</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java
deleted file mode 100644
index 0092f7a..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster;
-
-public class AdaptedNodeInformation {
-
- private String hostname;
- private Integer siteToSitePort;
- private int apiPort;
- private boolean isSiteToSiteSecure;
- private int totalFlowFiles;
-
- public String getHostname() {
- return hostname;
- }
-
- public void setHostname(String hostname) {
- this.hostname = hostname;
- }
-
- public Integer getSiteToSitePort() {
- return siteToSitePort;
- }
-
- public void setSiteToSitePort(Integer siteToSitePort) {
- this.siteToSitePort = siteToSitePort;
- }
-
- public int getApiPort() {
- return apiPort;
- }
-
- public void setApiPort(int apiPort) {
- this.apiPort = apiPort;
- }
-
- public boolean isSiteToSiteSecure() {
- return isSiteToSiteSecure;
- }
-
- public void setSiteToSiteSecure(boolean isSiteToSiteSecure) {
- this.isSiteToSiteSecure = isSiteToSiteSecure;
- }
-
- public int getTotalFlowFiles() {
- return totalFlowFiles;
- }
-
- public void setTotalFlowFiles(int totalFlowFiles) {
- this.totalFlowFiles = totalFlowFiles;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java
deleted file mode 100644
index 5751c32..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collection;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
-
-@XmlRootElement
-public class ClusterNodeInformation {
-
- private Collection<NodeInformation> nodeInfo;
-
- private static final JAXBContext JAXB_CONTEXT;
-
- static {
- try {
- JAXB_CONTEXT = JAXBContext.newInstance(ClusterNodeInformation.class);
- } catch (JAXBException e) {
- throw new RuntimeException("Unable to create JAXBContext.", e);
- }
- }
-
- public ClusterNodeInformation() {
- this.nodeInfo = null;
- }
-
- public void setNodeInformation(final Collection<NodeInformation> nodeInfo) {
- this.nodeInfo = nodeInfo;
- }
-
- @XmlJavaTypeAdapter(NodeInformationAdapter.class)
- public Collection<NodeInformation> getNodeInformation() {
- return nodeInfo;
- }
-
- public void marshal(final OutputStream os) throws JAXBException {
- final Marshaller marshaller = JAXB_CONTEXT.createMarshaller();
- marshaller.marshal(this, os);
- }
-
- public static ClusterNodeInformation unmarshal(final InputStream is) throws JAXBException {
- final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
- return (ClusterNodeInformation) unmarshaller.unmarshal(is);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java
deleted file mode 100644
index 987ff65..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster;
-
-public interface NodeInformant {
-
- ClusterNodeInformation getNodeInformation();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java
deleted file mode 100644
index 848eb7e..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster;
-
-public class NodeInformation {
-
- private final String hostname;
- private final Integer siteToSitePort;
- private final int apiPort;
- private final boolean isSiteToSiteSecure;
- private final int totalFlowFiles;
-
- public NodeInformation(final String hostname, final Integer siteToSitePort, final int apiPort,
- final boolean isSiteToSiteSecure, final int totalFlowFiles) {
- this.hostname = hostname;
- this.siteToSitePort = siteToSitePort;
- this.apiPort = apiPort;
- this.isSiteToSiteSecure = isSiteToSiteSecure;
- this.totalFlowFiles = totalFlowFiles;
- }
-
- public String getHostname() {
- return hostname;
- }
-
- public int getAPIPort() {
- return apiPort;
- }
-
- public Integer getSiteToSitePort() {
- return siteToSitePort;
- }
-
- public boolean isSiteToSiteSecure() {
- return isSiteToSiteSecure;
- }
-
- public int getTotalFlowFiles() {
- return totalFlowFiles;
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (obj == this) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (!(obj instanceof NodeInformation)) {
- return false;
- }
-
- final NodeInformation other = (NodeInformation) obj;
- if (!hostname.equals(other.hostname)) {
- return false;
- }
- if (siteToSitePort == null && other.siteToSitePort != null) {
- return false;
- }
- if (siteToSitePort != null && other.siteToSitePort == null) {
- return false;
- } else if (siteToSitePort != null && siteToSitePort.intValue() != other.siteToSitePort.intValue()) {
- return false;
- }
- if (apiPort != other.apiPort) {
- return false;
- }
- if (isSiteToSiteSecure != other.isSiteToSiteSecure) {
- return false;
- }
- return true;
- }
-
- @Override
- public int hashCode() {
- return 83832 + hostname.hashCode() + (siteToSitePort == null ? 8 : siteToSitePort.hashCode()) + apiPort + (isSiteToSiteSecure ? 3829 : 0);
- }
-
- @Override
- public String toString() {
- return "Node[" + hostname + ":" + apiPort + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java
deleted file mode 100644
index 630631f..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cluster;
-
-import javax.xml.bind.annotation.adapters.XmlAdapter;
-
-public class NodeInformationAdapter extends XmlAdapter<AdaptedNodeInformation, NodeInformation> {
-
- @Override
- public NodeInformation unmarshal(final AdaptedNodeInformation adapted) throws Exception {
- return new NodeInformation(adapted.getHostname(), adapted.getSiteToSitePort(), adapted.getApiPort(), adapted.isSiteToSiteSecure(), adapted.getTotalFlowFiles());
- }
-
- @Override
- public AdaptedNodeInformation marshal(final NodeInformation nodeInformation) throws Exception {
- final AdaptedNodeInformation adapted = new AdaptedNodeInformation();
- adapted.setHostname(nodeInformation.getHostname());
- adapted.setSiteToSitePort(nodeInformation.getSiteToSitePort());
- adapted.setApiPort(nodeInformation.getAPIPort());
- adapted.setSiteToSiteSecure(nodeInformation.isSiteToSiteSecure());
- adapted.setTotalFlowFiles(nodeInformation.getTotalFlowFiles());
- return adapted;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
index e0cca64..ac41cba 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.groups;
-import java.io.IOException;
import java.net.URI;
import java.util.Date;
import java.util.Set;
@@ -27,7 +26,7 @@ import org.apache.nifi.controller.exception.CommunicationsException;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteGroupPort;
-import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
public interface RemoteProcessGroup {
@@ -81,6 +80,8 @@ public interface RemoteProcessGroup {
void setYieldDuration(final String yieldDuration);
String getYieldDuration();
+
+ EndpointConnectionStatePool getConnectionPool();
/**
* Sets the timeout using the TimePeriod format (e.g., "30 secs", "1 min")
@@ -107,15 +108,6 @@ public interface RemoteProcessGroup {
String getCommunicationsTimeout();
/**
- * @return the port that the remote instance is listening on for
- * site-to-site communication, or <code>null</code> if the remote instance
- * is not configured to allow site-to-site communications.
- *
- * @throws IOException if unable to communicate with the remote instance
- */
- Integer getListeningPort() throws IOException;
-
- /**
* Indicates whether or not the RemoteProcessGroup is currently scheduled to
* transmit data
*
@@ -211,11 +203,6 @@ public interface RemoteProcessGroup {
*/
void removeNonExistentPort(final RemoteGroupPort port);
- /**
- *
- * @return @throws IOException
- */
- CommunicationsSession establishSiteToSiteConnection() throws IOException;
/**
* Called whenever RemoteProcessGroup is removed from the flow, so that any
@@ -232,24 +219,4 @@ public interface RemoteProcessGroup {
void verifyCanStopTransmitting();
void verifyCanUpdate();
-
- /**
- * Returns a set of PeerStatus objects that describe the different peers
- * that we can communicate with for this RemoteProcessGroup.
- *
- * If the destination is a cluster, this set will contain PeerStatuses for
- * each of the nodes in the cluster.
- *
- * If the destination is a standalone instance, this set will contain just a
- * PeerStatus for the destination.
- *
- * Once the PeerStatuses have been obtained, they may be cached by this
- * RemoteProcessGroup for some amount of time.
- *
- * If unable to obtain the PeerStatuses or no peer status has yet been
- * obtained, will return null.
- *
- * @return
- */
- Set<PeerStatus> getPeerStatuses();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/Peer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/Peer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/Peer.java
deleted file mode 100644
index 2422fe1..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/Peer.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-
-public class Peer {
-
- private final CommunicationsSession commsSession;
- private final String url;
- private final String host;
- private long penalizationExpiration = 0L;
- private boolean closed = false;
-
- public Peer(final CommunicationsSession commsSession, final String url) {
- this.commsSession = commsSession;
- this.url = url;
-
- try {
- this.host = new URI(url).getHost();
- } catch (final Exception e) {
- throw new IllegalArgumentException("Invalid URL: " + url);
- }
- }
-
- public String getUrl() {
- return url;
- }
-
- public CommunicationsSession getCommunicationsSession() {
- return commsSession;
- }
-
- public void close() throws IOException {
- this.closed = true;
-
- // TODO: Consume the InputStream so that it doesn't linger on the Peer's outgoing socket buffer
- commsSession.close();
- }
-
- public void penalize(final long millis) {
- penalizationExpiration = Math.max(penalizationExpiration, System.currentTimeMillis() + millis);
- }
-
- public boolean isPenalized() {
- return penalizationExpiration > System.currentTimeMillis();
- }
-
- public boolean isClosed() {
- return closed;
- }
-
- public String getHost() {
- return host;
- }
-
- @Override
- public int hashCode() {
- return 8320 + url.hashCode();
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (obj == null) {
- return false;
- }
- if (obj == this) {
- return true;
- }
- if (!(obj instanceof Peer)) {
- return false;
- }
-
- final Peer other = (Peer) obj;
- return this.url.equals(other.url);
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("Peer[url=").append(url);
- if (closed) {
- sb.append(",CLOSED");
- } else if (isPenalized()) {
- sb.append(",PENALIZED");
- }
- sb.append("]");
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java
deleted file mode 100644
index d1cb076..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-public class PeerStatus {
-
- private final String hostname;
- private final int port;
- private final boolean secure;
- private final int numFlowFiles;
-
- public PeerStatus(final String hostname, final int port, final boolean secure, final int numFlowFiles) {
- this.hostname = hostname;
- this.port = port;
- this.secure = secure;
- this.numFlowFiles = numFlowFiles;
- }
-
- public String getHostname() {
- return hostname;
- }
-
- public int getPort() {
- return port;
- }
-
- public boolean isSecure() {
- return secure;
- }
-
- public int getFlowFileCount() {
- return numFlowFiles;
- }
-
- @Override
- public String toString() {
- return "PeerStatus[hostname=" + hostname + ",port=" + port + ",secure=" + secure + ",flowFileCount=" + numFlowFiles + "]";
- }
-
- @Override
- public int hashCode() {
- return 9824372 + hostname.hashCode() + port;
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (obj == null) {
- return false;
- }
-
- if (!(obj instanceof PeerStatus)) {
- return false;
- }
-
- final PeerStatus other = (PeerStatus) obj;
- return port == other.port && hostname.equals(other.hostname);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
index d4ad374..f08277c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java
@@ -16,20 +16,28 @@
*/
package org.apache.nifi.remote;
+import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Port;
+import org.apache.nifi.controller.AbstractPort;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
-public interface RemoteGroupPort extends Port {
+public abstract class RemoteGroupPort extends AbstractPort implements Port, RemoteDestination {
- RemoteProcessGroup getRemoteProcessGroup();
+ public RemoteGroupPort(String id, String name, ProcessGroup processGroup, ConnectableType type, ProcessScheduler scheduler) {
+ super(id, name, processGroup, type, scheduler);
+ }
- TransferDirection getTransferDirection();
+ public abstract RemoteProcessGroup getRemoteProcessGroup();
- boolean isUseCompression();
+ public abstract TransferDirection getTransferDirection();
- void setUseCompression(boolean useCompression);
+ public abstract boolean isUseCompression();
- boolean getTargetExists();
+ public abstract void setUseCompression(boolean useCompression);
- boolean isTargetRunning();
+ public abstract boolean getTargetExists();
+
+ public abstract boolean isTargetRunning();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java
deleted file mode 100644
index 56432d5..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-public enum TransferDirection {
-
- SEND,
- RECEIVE;
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
deleted file mode 100644
index bfccd98..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-public interface VersionedRemoteResource {
-
- VersionNegotiator getVersionNegotiator();
-
- String getResourceName();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java
new file mode 100644
index 0000000..e46ff5c
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.cluster;
+
+public interface NodeInformant {
+
+ ClusterNodeInformation getNodeInformation();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
deleted file mode 100644
index b4206b3..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.codec;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.remote.VersionedRemoteResource;
-import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
-
-/**
- * <p>
- * Provides a mechanism for encoding and decoding FlowFiles as streams so that
- * they can be transferred remotely.
- * </p>
- */
-public interface FlowFileCodec extends VersionedRemoteResource {
-
- /**
- * Returns a List of all versions that this codec is able to support, in the
- * order that they are preferred by the codec
- *
- * @return
- */
- public List<Integer> getSupportedVersions();
-
- /**
- * Encodes a FlowFile and its content as a single stream of data and writes
- * that stream to the output. If checksum is not null, it will be calculated
- * as the stream is read
- *
- * @param flowFile the FlowFile to encode
- * @param session a session that can be used to transactionally create and
- * transfer flow files
- * @param outStream the stream to write the data to
- *
- * @return the updated FlowFile
- *
- * @throws IOException
- */
- FlowFile encode(FlowFile flowFile, ProcessSession session, OutputStream outStream) throws IOException, TransmissionDisabledException;
-
- /**
- * Decodes the contents of the InputStream, interpreting the data to
- * determine the next FlowFile's attributes and content, as well as their
- * destinations. If not null, checksum will be used to calculate the
- * checksum as the data is read.
- *
- * @param stream an InputStream containing FlowFiles' contents, attributes,
- * and destinations
- * @param session
- *
- * @return the FlowFile that was created, or <code>null</code> if the stream
- * was out of data
- *
- * @throws IOException
- * @throws ProtocolException if the input is malformed
- */
- FlowFile decode(InputStream stream, ProcessSession session) throws IOException, ProtocolException, TransmissionDisabledException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
deleted file mode 100644
index b61fc65..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class HandshakeException extends Exception {
-
- private static final long serialVersionUID = 178192341908726L;
-
- public HandshakeException(final String message) {
- super(message);
- }
-
- public HandshakeException(final Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
deleted file mode 100644
index af0f467..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class PortNotRunningException extends Exception {
-
- private static final long serialVersionUID = -2790940982005516375L;
-
- public PortNotRunningException(final String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
deleted file mode 100644
index 0f50b98..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class ProtocolException extends Exception {
-
- private static final long serialVersionUID = 5763900324505818495L;
-
- public ProtocolException(final String message, final Throwable cause) {
- super(message, cause);
- }
-
- public ProtocolException(final String message) {
- super(message);
- }
-
- public ProtocolException(final Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
deleted file mode 100644
index e6a0fe7..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class UnknownPortException extends Exception {
-
- private static final long serialVersionUID = -2790940982005516375L;
-
- public UnknownPortException(final String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
deleted file mode 100644
index 32274eb..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.remote.Peer;
-import org.apache.nifi.remote.PeerStatus;
-import org.apache.nifi.remote.VersionedRemoteResource;
-import org.apache.nifi.remote.codec.FlowFileCodec;
-import org.apache.nifi.remote.exception.HandshakeException;
-import org.apache.nifi.remote.exception.PortNotRunningException;
-import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.exception.UnknownPortException;
-
-public interface ClientProtocol extends VersionedRemoteResource {
-
- void handshake(Peer peer) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException;
-
- Set<PeerStatus> getPeerStatuses(Peer currentPeer) throws IOException, ProtocolException;
-
- FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException;
-
- void receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
-
- void transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
-
- void shutdown(Peer peer) throws IOException, ProtocolException;
-
- boolean isReadyForFileTransfer();
-
- /**
- * returns <code>true</code> if remote instance indicates that the port is
- * invalid
- *
- * @return
- * @throws IllegalStateException if a handshake has not successfully
- * completed
- */
- boolean isPortInvalid() throws IllegalStateException;
-
- /**
- * returns <code>true</code> if remote instance indicates that the port is
- * unknown
- *
- * @return
- * @throws IllegalStateException if a handshake has not successfully
- * completed
- */
- boolean isPortUnknown();
-
- /**
- * returns <code>true</code> if remote instance indicates that the port's
- * destination is full
- *
- * @return
- * @throws IllegalStateException if a handshake has not successfully
- * completed
- */
- boolean isDestinationFull();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
deleted file mode 100644
index d2e2946..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-public interface CommunicationsInput {
-
- InputStream getInputStream() throws IOException;
-
- long getBytesRead();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
deleted file mode 100644
index 95cab29..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-public interface CommunicationsOutput {
-
- OutputStream getOutputStream() throws IOException;
-
- long getBytesWritten();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
deleted file mode 100644
index d009cec..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public interface CommunicationsSession extends Closeable {
-
- public static final byte[] MAGIC_BYTES = {(byte) 'N', (byte) 'i', (byte) 'F', (byte) 'i'};
-
- CommunicationsInput getInput();
-
- CommunicationsOutput getOutput();
-
- void setTimeout(int millis) throws IOException;
-
- int getTimeout() throws IOException;
-
- void setUri(String uri);
-
- String getUri();
-
- String getUserDn();
-
- void setUserDn(String dn);
-
- boolean isDataAvailable();
-
- long getBytesWritten();
-
- long getBytesRead();
-
- /**
- * Asynchronously interrupts this FlowFileCodec. Implementations must ensure
- * that they stop sending and receiving data as soon as possible after this
- * method has been called, even if doing so results in sending only partial
- * data to the peer. This will usually result in the peer throwing a
- * SocketTimeoutException.
- */
- void interrupt();
-
- /**
- * Returns <code>true</code> if the connection is closed, <code>false</code>
- * otherwise.
- *
- * @return
- */
- boolean isClosed();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
deleted file mode 100644
index 41334fe..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.protocol;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public enum RequestType {
-
- NEGOTIATE_FLOWFILE_CODEC,
- REQUEST_PEER_LIST,
- SEND_FLOWFILES,
- RECEIVE_FLOWFILES,
- SHUTDOWN;
-
- public void writeRequestType(final DataOutputStream dos) throws IOException {
- dos.writeUTF(name());
- }
-
- public static RequestType readRequestType(final DataInputStream dis) throws IOException {
- final String requestTypeVal = dis.readUTF();
- try {
- return RequestType.valueOf(requestTypeVal);
- } catch (final Exception e) {
- throw new IOException("Could not determine RequestType: received invalid value " + requestTypeVal);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
index 0d18f2e..0118534 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java
@@ -18,13 +18,13 @@ package org.apache.nifi.remote.protocol;
import java.io.IOException;
-import org.apache.nifi.cluster.NodeInformant;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.VersionedRemoteResource;
+import org.apache.nifi.remote.cluster.NodeInformant;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.BadRequestException;
import org.apache.nifi.remote.exception.HandshakeException;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
index df5c845..1ff5fbe 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml
@@ -64,6 +64,10 @@
<artifactId>nifi-client-dto</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-site-to-site-client</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
</dependency>