You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/19 01:23:07 UTC
[4/5] incubator-nifi git commit: NIFI-282: Begin refactoring and
creating client
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
new file mode 100644
index 0000000..d18a4ee
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.codec;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.exception.ProtocolException;
+
+public class StandardFlowFileCodec implements FlowFileCodec {
+ public static final int MAX_NUM_ATTRIBUTES = 25000;
+
+ public static final String DEFAULT_FLOWFILE_PATH = "./";
+
+ private final VersionNegotiator versionNegotiator;
+
+ public StandardFlowFileCodec() {
+ versionNegotiator = new StandardVersionNegotiator(1);
+ }
+
+ @Override
+ public FlowFile encode(final FlowFile flowFile, final ProcessSession session, final OutputStream encodedOut) throws IOException {
+ final DataOutputStream out = new DataOutputStream(encodedOut);
+
+ final Map<String, String> attributes = flowFile.getAttributes();
+ out.writeInt(attributes.size());
+ for ( final Map.Entry<String, String> entry : attributes.entrySet() ) {
+ writeString(entry.getKey(), out);
+ writeString(entry.getValue(), out);
+ }
+
+ out.writeLong(flowFile.getSize());
+
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ final byte[] buffer = new byte[8192];
+ int len;
+ while ( (len = in.read(buffer)) > 0 ) {
+ encodedOut.write(buffer, 0, len);
+ }
+
+ encodedOut.flush();
+ }
+ });
+
+ return flowFile;
+ }
+
+
+ @Override
+ public FlowFile decode(final InputStream stream, final ProcessSession session) throws IOException, ProtocolException {
+ final DataInputStream in = new DataInputStream(stream);
+
+ final int numAttributes;
+ try {
+ numAttributes = in.readInt();
+ } catch (final EOFException e) {
+ // we're out of data.
+ return null;
+ }
+
+ // This is here because if the stream is not properly formed, we could get up to Integer.MAX_VALUE attributes, which will
+ // generally result in an OutOfMemoryError.
+ if ( numAttributes > MAX_NUM_ATTRIBUTES ) {
+ throw new ProtocolException("FlowFile exceeds maximum number of attributes with a total of " + numAttributes);
+ }
+
+ try {
+ final Map<String, String> attributes = new HashMap<>(numAttributes);
+ for (int i=0; i < numAttributes; i++) {
+ final String attrName = readString(in);
+ final String attrValue = readString(in);
+ attributes.put(attrName, attrValue);
+ }
+
+ final long numBytes = in.readLong();
+
+ FlowFile flowFile = session.create();
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws IOException {
+ int len;
+ long size = 0;
+ final byte[] buffer = new byte[8192];
+
+ while ( size < numBytes && (len = in.read(buffer, 0, (int) Math.min(buffer.length, numBytes - size))) > 0 ) {
+ out.write(buffer, 0, len);
+ size += len;
+ }
+
+ if ( size != numBytes ) {
+ throw new EOFException("Expected " + numBytes + " bytes but received only " + size);
+ }
+ }
+ });
+
+ return flowFile;
+ } catch (final EOFException e) {
+ session.rollback();
+
+ // we throw the general IOException here because we did not expect to hit EOFException
+ throw e;
+ }
+ }
+
+ private void writeString(final String val, final DataOutputStream out) throws IOException {
+ final byte[] bytes = val.getBytes("UTF-8");
+ out.writeInt(bytes.length);
+ out.write(bytes);
+ }
+
+
+ private String readString(final DataInputStream in) throws IOException {
+ final int numBytes = in.readInt();
+ final byte[] bytes = new byte[numBytes];
+ StreamUtils.fillBuffer(in, bytes, true);
+ return new String(bytes, "UTF-8");
+ }
+
+ @Override
+ public List<Integer> getSupportedVersions() {
+ return versionNegotiator.getSupportedVersions();
+ }
+
+ @Override
+ public VersionNegotiator getVersionNegotiator() {
+ return versionNegotiator;
+ }
+
+ @Override
+ public String toString() {
+ return "Standard FlowFile Codec, Version " + versionNegotiator.getVersion();
+ }
+
+ @Override
+ public String getResourceName() {
+ return "StandardFlowFileCodec";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
new file mode 100644
index 0000000..f6c2f4f
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class BadRequestException extends Exception {
+
+ private static final long serialVersionUID = -8034602852256106560L;
+
+ public BadRequestException(final String message) {
+ super(message);
+ }
+
+ public BadRequestException(final Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
new file mode 100644
index 0000000..b61fc65
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class HandshakeException extends Exception {
+
+ private static final long serialVersionUID = 178192341908726L;
+
+ public HandshakeException(final String message) {
+ super(message);
+ }
+
+ public HandshakeException(final Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
new file mode 100644
index 0000000..24ff3a5
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class NotAuthorizedException extends Exception {
+
+ private static final long serialVersionUID = 2952623568114035498L;
+
+ public NotAuthorizedException(final String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
new file mode 100644
index 0000000..af0f467
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class PortNotRunningException extends Exception {
+
+ private static final long serialVersionUID = -2790940982005516375L;
+
+ public PortNotRunningException(final String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
new file mode 100644
index 0000000..0f50b98
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class ProtocolException extends Exception {
+
+ private static final long serialVersionUID = 5763900324505818495L;
+
+ public ProtocolException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+ public ProtocolException(final String message) {
+ super(message);
+ }
+
+ public ProtocolException(final Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
new file mode 100644
index 0000000..dd675b3
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+/**
+ * Used to indicate that by the time the request was serviced, it had already
+ * expired
+ */
+public class RequestExpiredException extends Exception {
+
+ private static final long serialVersionUID = -7037025330562827852L;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
new file mode 100644
index 0000000..e6a0fe7
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class UnknownPortException extends Exception {
+
+ private static final long serialVersionUID = -2790940982005516375L;
+
+ public UnknownPortException(final String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java
new file mode 100644
index 0000000..926809c
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+import org.apache.nifi.remote.codec.FlowFileCodec;
+
+public class UnsupportedCodecException extends RuntimeException {
+ private static final long serialVersionUID = 198234789237L;
+
+ public UnsupportedCodecException(final String codecName) {
+ super("Codec " + codecName + " is not supported");
+ }
+
+ public UnsupportedCodecException(final FlowFileCodec codec, final int version) {
+ super("Codec " + codec.getClass().getName() + " does not support Version " + version);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
new file mode 100644
index 0000000..0822b6a
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+
+import org.apache.nifi.remote.AbstractCommunicationsSession;
+
+public class SocketChannelCommunicationsSession extends AbstractCommunicationsSession {
+ private final SocketChannel channel;
+ private final SocketChannelInput request;
+ private final SocketChannelOutput response;
+ private int timeout = 30000;
+
+ public SocketChannelCommunicationsSession(final SocketChannel socketChannel, final String uri) throws IOException {
+ super(uri);
+ request = new SocketChannelInput(socketChannel);
+ response = new SocketChannelOutput(socketChannel);
+ channel = socketChannel;
+ socketChannel.configureBlocking(false);
+ }
+
+ @Override
+ public boolean isClosed() {
+ return !channel.isConnected();
+ }
+
+ @Override
+ public SocketChannelInput getInput() {
+ return request;
+ }
+
+ @Override
+ public SocketChannelOutput getOutput() {
+ return response;
+ }
+
+ @Override
+ public void setTimeout(final int millis) throws IOException {
+ request.setTimeout(millis);
+ response.setTimeout(millis);
+ this.timeout = millis;
+ }
+
+ @Override
+ public int getTimeout() throws IOException {
+ return timeout;
+ }
+
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+
+ @Override
+ public boolean isDataAvailable() {
+ return request.isDataAvailable();
+ }
+
+ @Override
+ public long getBytesWritten() {
+ return response.getBytesWritten();
+ }
+
+ @Override
+ public long getBytesRead() {
+ return request.getBytesRead();
+ }
+
+ @Override
+ public void interrupt() {
+ request.interrupt();
+ response.interrupt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
new file mode 100644
index 0000000..9e451fd
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.channels.SocketChannel;
+
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.stream.io.ByteCountingInputStream;
+import org.apache.nifi.remote.io.InterruptableInputStream;
+import org.apache.nifi.remote.protocol.CommunicationsInput;
+
+public class SocketChannelInput implements CommunicationsInput {
+ private final SocketChannelInputStream socketIn;
+ private final ByteCountingInputStream countingIn;
+ private final InputStream bufferedIn;
+ private final InterruptableInputStream interruptableIn;
+
+ public SocketChannelInput(final SocketChannel socketChannel) throws IOException {
+ this.socketIn = new SocketChannelInputStream(socketChannel);
+ countingIn = new ByteCountingInputStream(socketIn);
+ bufferedIn = new BufferedInputStream(countingIn);
+ interruptableIn = new InterruptableInputStream(bufferedIn);
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return interruptableIn;
+ }
+
+ public void setTimeout(final int millis) {
+ socketIn.setTimeout(millis);
+ }
+
+ public boolean isDataAvailable() {
+ try {
+ return interruptableIn.available() > 0;
+ } catch (final Exception e) {
+ return false;
+ }
+ }
+
+ @Override
+ public long getBytesRead() {
+ return countingIn.getBytesRead();
+ }
+
+ public void interrupt() {
+ interruptableIn.interrupt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
new file mode 100644
index 0000000..26c0164
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.channels.SocketChannel;
+
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
+import org.apache.nifi.remote.io.InterruptableOutputStream;
+import org.apache.nifi.remote.protocol.CommunicationsOutput;
+
+public class SocketChannelOutput implements CommunicationsOutput {
+ private final SocketChannelOutputStream socketOutStream;
+ private final ByteCountingOutputStream countingOut;
+ private final OutputStream bufferedOut;
+ private final InterruptableOutputStream interruptableOut;
+
+ public SocketChannelOutput(final SocketChannel socketChannel) throws IOException {
+ socketOutStream = new SocketChannelOutputStream(socketChannel);
+ countingOut = new ByteCountingOutputStream(socketOutStream);
+ bufferedOut = new BufferedOutputStream(countingOut);
+ interruptableOut = new InterruptableOutputStream(bufferedOut);
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ return interruptableOut;
+ }
+
+ public void setTimeout(final int timeout) {
+ socketOutStream.setTimeout(timeout);
+ }
+
+ @Override
+ public long getBytesWritten() {
+ return countingOut.getBytesWritten();
+ }
+
+ public void interrupt() {
+ interruptableOut.interrupt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
new file mode 100644
index 0000000..dca1d84
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+
+import org.apache.nifi.remote.AbstractCommunicationsSession;
+
+public class SSLSocketChannelCommunicationsSession extends AbstractCommunicationsSession {
+ private final SSLSocketChannel channel;
+ private final SSLSocketChannelInput request;
+ private final SSLSocketChannelOutput response;
+
+ public SSLSocketChannelCommunicationsSession(final SSLSocketChannel channel, final String uri) {
+ super(uri);
+ request = new SSLSocketChannelInput(channel);
+ response = new SSLSocketChannelOutput(channel);
+ this.channel = channel;
+ }
+
+ @Override
+ public SSLSocketChannelInput getInput() {
+ return request;
+ }
+
+ @Override
+ public SSLSocketChannelOutput getOutput() {
+ return response;
+ }
+
+ @Override
+ public void setTimeout(final int millis) throws IOException {
+ channel.setTimeout(millis);
+ }
+
+ @Override
+ public int getTimeout() throws IOException {
+ return channel.getTimeout();
+ }
+
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return channel.isClosed();
+ }
+
+ @Override
+ public boolean isDataAvailable() {
+ try {
+ return request.isDataAvailable();
+ } catch (final Exception e) {
+ return false;
+ }
+ }
+
+ @Override
+ public long getBytesWritten() {
+ return response.getBytesWritten();
+ }
+
+ @Override
+ public long getBytesRead() {
+ return request.getBytesRead();
+ }
+
+ @Override
+ public void interrupt() {
+ channel.interrupt();
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + "[SSLSocketChannel=" + channel + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
new file mode 100644
index 0000000..60ef33f
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.stream.io.ByteCountingInputStream;
+import org.apache.nifi.remote.protocol.CommunicationsInput;
+
+public class SSLSocketChannelInput implements CommunicationsInput {
+ private final SSLSocketChannelInputStream in;
+ private final ByteCountingInputStream countingIn;
+ private final InputStream bufferedIn;
+
+ public SSLSocketChannelInput(final SSLSocketChannel socketChannel) {
+ in = new SSLSocketChannelInputStream(socketChannel);
+ countingIn = new ByteCountingInputStream(in);
+ this.bufferedIn = new BufferedInputStream(countingIn);
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return bufferedIn;
+ }
+
+ public boolean isDataAvailable() throws IOException {
+ return bufferedIn.available() > 0;
+ }
+
+ @Override
+ public long getBytesRead() {
+ return countingIn.getBytesRead();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
new file mode 100644
index 0000000..dc3d68f
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.io.socket.ssl;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
+import org.apache.nifi.remote.protocol.CommunicationsOutput;
+
+public class SSLSocketChannelOutput implements CommunicationsOutput {
+ private final OutputStream out;
+ private final ByteCountingOutputStream countingOut;
+
+ public SSLSocketChannelOutput(final SSLSocketChannel channel) {
+ countingOut = new ByteCountingOutputStream(new SSLSocketChannelOutputStream(channel));
+ out = new BufferedOutputStream(countingOut);
+ }
+
+ @Override
+ public OutputStream getOutputStream() throws IOException {
+ return out;
+ }
+
+ @Override
+ public long getBytesWritten() {
+ return countingOut.getBytesWritten();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
new file mode 100644
index 0000000..32274eb
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.VersionedRemoteResource;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.PortNotRunningException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.exception.UnknownPortException;
+
+public interface ClientProtocol extends VersionedRemoteResource {
+
+ void handshake(Peer peer) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException;
+
+ Set<PeerStatus> getPeerStatuses(Peer currentPeer) throws IOException, ProtocolException;
+
+ FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException;
+
+ void receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
+
+ void transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException;
+
+ void shutdown(Peer peer) throws IOException, ProtocolException;
+
+ boolean isReadyForFileTransfer();
+
+ /**
+ * returns <code>true</code> if remote instance indicates that the port is
+ * invalid
+ *
+ * @return
+ * @throws IllegalStateException if a handshake has not successfully
+ * completed
+ */
+ boolean isPortInvalid() throws IllegalStateException;
+
+ /**
+ * returns <code>true</code> if remote instance indicates that the port is
+ * unknown
+ *
+ * @return
+ * @throws IllegalStateException if a handshake has not successfully
+ * completed
+ */
+ boolean isPortUnknown();
+
+ /**
+ * returns <code>true</code> if remote instance indicates that the port's
+ * destination is full
+ *
+ * @return
+ * @throws IllegalStateException if a handshake has not successfully
+ * completed
+ */
+ boolean isDestinationFull();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
new file mode 100644
index 0000000..d2e2946
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public interface CommunicationsInput {
+
+ InputStream getInputStream() throws IOException;
+
+ long getBytesRead();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
new file mode 100644
index 0000000..95cab29
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public interface CommunicationsOutput {
+
+ OutputStream getOutputStream() throws IOException;
+
+ long getBytesWritten();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
new file mode 100644
index 0000000..d009cec
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public interface CommunicationsSession extends Closeable {
+
+ public static final byte[] MAGIC_BYTES = {(byte) 'N', (byte) 'i', (byte) 'F', (byte) 'i'};
+
+ CommunicationsInput getInput();
+
+ CommunicationsOutput getOutput();
+
+ void setTimeout(int millis) throws IOException;
+
+ int getTimeout() throws IOException;
+
+ void setUri(String uri);
+
+ String getUri();
+
+ String getUserDn();
+
+ void setUserDn(String dn);
+
+ boolean isDataAvailable();
+
+ long getBytesWritten();
+
+ long getBytesRead();
+
+ /**
+ * Asynchronously interrupts this FlowFileCodec. Implementations must ensure
+ * that they stop sending and receiving data as soon as possible after this
+ * method has been called, even if doing so results in sending only partial
+ * data to the peer. This will usually result in the peer throwing a
+ * SocketTimeoutException.
+ */
+ void interrupt();
+
+ /**
+ * Returns <code>true</code> if the connection is closed, <code>false</code>
+ * otherwise.
+ *
+ * @return
+ */
+ boolean isClosed();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
new file mode 100644
index 0000000..41334fe
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public enum RequestType {
+
+ NEGOTIATE_FLOWFILE_CODEC,
+ REQUEST_PEER_LIST,
+ SEND_FLOWFILES,
+ RECEIVE_FLOWFILES,
+ SHUTDOWN;
+
+ public void writeRequestType(final DataOutputStream dos) throws IOException {
+ dos.writeUTF(name());
+ }
+
+ public static RequestType readRequestType(final DataInputStream dis) throws IOException {
+ final String requestTypeVal = dis.readUTF();
+ try {
+ return RequestType.valueOf(requestTypeVal);
+ } catch (final Exception e) {
+ throw new IOException("Could not determine RequestType: received invalid value " + requestTypeVal);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
new file mode 100644
index 0000000..c4519cd
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+public enum HandshakeProperty {
+ GZIP,
+ PORT_IDENTIFIER,
+ REQUEST_EXPIRATION_MILLIS;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
new file mode 100644
index 0000000..eae1940
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.nifi.remote.exception.ProtocolException;
+
+public class Response {
+ private final ResponseCode code;
+ private final String message;
+
+ private Response(final ResponseCode code, final String explanation) {
+ this.code = code;
+ this.message = explanation;
+ }
+
+ public ResponseCode getCode() {
+ return code;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public static Response read(final DataInputStream in) throws IOException, ProtocolException {
+ final ResponseCode code = ResponseCode.readCode(in);
+ final String message = code.containsMessage() ? in.readUTF() : null;
+ return new Response(code, message);
+ }
+
+ @Override
+ public String toString() {
+ return code + ": " + message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
new file mode 100644
index 0000000..0e588cd
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.remote.exception.ProtocolException;
+
+
+public enum ResponseCode {
+ RESERVED(0, "Reserved for Future Use", false), // This will likely be used if we ever need to expand the length of
+ // ResponseCode, so that we can indicate a 0 followed by some other bytes
+
+ // handshaking properties
+ PROPERTIES_OK(1, "Properties OK", false),
+ UNKNOWN_PROPERTY_NAME(230, "Unknown Property Name", true),
+ ILLEGAL_PROPERTY_VALUE(231, "Illegal Property Value", true),
+ MISSING_PROPERTY(232, "Missing Property", true),
+
+ // transaction indicators
+ CONTINUE_TRANSACTION(10, "Continue Transaction", false),
+ FINISH_TRANSACTION(11, "Finish Transaction", false),
+ CONFIRM_TRANSACTION(12, "Confirm Transaction", true), // "Explanation" of this code is the checksum
+ TRANSACTION_FINISHED(13, "Transaction Finished", false),
+ TRANSACTION_FINISHED_BUT_DESTINATION_FULL(14, "Transaction Finished But Destination is Full", false),
+ BAD_CHECKSUM(19, "Bad Checksum", false),
+
+ // data availability indicators
+ MORE_DATA(20, "More Data Exists", false),
+ NO_MORE_DATA(21, "No More Data Exists", false),
+
+ // port state indicators
+ UNKNOWN_PORT(200, "Unknown Port", false),
+ PORT_NOT_IN_VALID_STATE(201, "Port Not in a Valid State", true),
+ PORTS_DESTINATION_FULL(202, "Port's Destination is Full", false),
+
+ // authorization
+ UNAUTHORIZED(240, "User Not Authorized", true),
+
+ // error indicators
+ ABORT(250, "Abort", true),
+ UNRECOGNIZED_RESPONSE_CODE(254, "Unrecognized Response Code", false),
+ END_OF_STREAM(255, "End of Stream", false);
+
+ private static final ResponseCode[] codeArray = new ResponseCode[256];
+
+ static {
+ for ( final ResponseCode responseCode : ResponseCode.values() ) {
+ codeArray[responseCode.getCode()] = responseCode;
+ }
+ }
+
+ private static final byte CODE_SEQUENCE_VALUE_1 = (byte) 'R';
+ private static final byte CODE_SEQUENCE_VALUE_2 = (byte) 'C';
+ private final int code;
+ private final byte[] codeSequence;
+ private final String description;
+ private final boolean containsMessage;
+
+ private ResponseCode(final int code, final String description, final boolean containsMessage) {
+ this.codeSequence = new byte[] {CODE_SEQUENCE_VALUE_1, CODE_SEQUENCE_VALUE_2, (byte) code};
+ this.code = code;
+ this.description = description;
+ this.containsMessage = containsMessage;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public byte[] getCodeSequence() {
+ return codeSequence;
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+
+ public boolean containsMessage() {
+ return containsMessage;
+ }
+
+ public void writeResponse(final DataOutputStream out) throws IOException {
+ if ( containsMessage() ) {
+ throw new IllegalArgumentException("ResponseCode " + code + " expects an explanation");
+ }
+
+ out.write(getCodeSequence());
+ out.flush();
+ }
+
+ public void writeResponse(final DataOutputStream out, final String explanation) throws IOException {
+ if ( !containsMessage() ) {
+ throw new IllegalArgumentException("ResponseCode " + code + " does not expect an explanation");
+ }
+
+ out.write(getCodeSequence());
+ out.writeUTF(explanation);
+ out.flush();
+ }
+
+ static ResponseCode readCode(final InputStream in) throws IOException, ProtocolException {
+ final int byte1 = in.read();
+ if ( byte1 < 0 ) {
+ throw new EOFException();
+ } else if ( byte1 != CODE_SEQUENCE_VALUE_1 ) {
+ throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode");
+ }
+
+ final int byte2 = in.read();
+ if ( byte2 < 0 ) {
+ throw new EOFException();
+ } else if ( byte2 != CODE_SEQUENCE_VALUE_2 ) {
+ throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode");
+ }
+
+ final int byte3 = in.read();
+ if ( byte3 < 0 ) {
+ throw new EOFException();
+ }
+
+ final ResponseCode responseCode = codeArray[byte3];
+ if (responseCode == null) {
+ throw new ProtocolException("Received Response Code of " + byte3 + " but do not recognize this code");
+ }
+ return responseCode;
+ }
+
+ public static ResponseCode fromSequence(final byte[] value) {
+ final int code = value[3] & 0xFF;
+ final ResponseCode responseCode = codeArray[code];
+ return (responseCode == null) ? UNRECOGNIZED_RESPONSE_CODE : responseCode;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
new file mode 100644
index 0000000..2f4f755
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.RemoteDestination;
+import org.apache.nifi.remote.RemoteResourceInitiator;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.codec.StandardFlowFileCodec;
+import org.apache.nifi.remote.exception.HandshakeException;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.io.CompressionInputStream;
+import org.apache.nifi.remote.io.CompressionOutputStream;
+import org.apache.nifi.remote.protocol.ClientProtocol;
+import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.RequestType;
+import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SocketClientProtocol implements ClientProtocol {
+ private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1);
+
+
+ private RemoteDestination destination;
+ private boolean useCompression;
+
+ private String commsIdentifier;
+ private boolean handshakeComplete = false;
+
+ private final Logger logger = LoggerFactory.getLogger(SocketClientProtocol.class);
+
+ private Response handshakeResponse = null;
+ private boolean readyForFileTransfer = false;
+ private String transitUriPrefix = null;
+
+ private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
+
+ public SocketClientProtocol() {
+ }
+
+ public void setDestination(final RemoteDestination destination) {
+ this.destination = destination;
+ this.useCompression = destination.isUseCompression();
+ }
+
+
+ @Override
+ public void handshake(final Peer peer) throws IOException, HandshakeException {
+ handshake(peer, destination.getIdentifier(), (int) destination.getCommunicationsTimeout(TimeUnit.MILLISECONDS));
+ }
+
+ public void handshake(final Peer peer, final String destinationId, final int timeoutMillis) throws IOException, HandshakeException {
+ if ( handshakeComplete ) {
+ throw new IllegalStateException("Handshake has already been completed");
+ }
+ commsIdentifier = UUID.randomUUID().toString();
+ logger.debug("{} handshaking with {}", this, peer);
+
+ final Map<HandshakeProperty, String> properties = new HashMap<>();
+ properties.put(HandshakeProperty.GZIP, String.valueOf(useCompression));
+
+ if ( destinationId != null ) {
+ properties.put(HandshakeProperty.PORT_IDENTIFIER, destination.getIdentifier());
+ }
+
+ properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis) );
+
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ commsSession.setTimeout((int) destination.getCommunicationsTimeout(TimeUnit.MILLISECONDS));
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+
+ dos.writeUTF(commsIdentifier);
+
+ if ( versionNegotiator.getVersion() >= 3 ) {
+ dos.writeUTF(peer.getUrl());
+ transitUriPrefix = peer.getUrl();
+
+ if ( !transitUriPrefix.endsWith("/") ) {
+ transitUriPrefix = transitUriPrefix + "/";
+ }
+ }
+
+ dos.writeInt(properties.size());
+ for ( final Map.Entry<HandshakeProperty, String> entry : properties.entrySet() ) {
+ dos.writeUTF(entry.getKey().name());
+ dos.writeUTF(entry.getValue());
+ }
+
+ dos.flush();
+
+ try {
+ handshakeResponse = Response.read(dis);
+ } catch (final ProtocolException e) {
+ throw new HandshakeException(e);
+ }
+
+ switch (handshakeResponse.getCode()) {
+ case PORT_NOT_IN_VALID_STATE:
+ case UNKNOWN_PORT:
+ case PORTS_DESTINATION_FULL:
+ break;
+ case PROPERTIES_OK:
+ readyForFileTransfer = true;
+ break;
+ default:
+ logger.error("{} received unexpected response {} from {} when negotiating Codec", new Object[] {
+ this, handshakeResponse, peer});
+ peer.close();
+ throw new HandshakeException("Received unexpected response " + handshakeResponse);
+ }
+
+ logger.debug("{} Finished handshake with {}", this, peer);
+ handshakeComplete = true;
+ }
+
+ public boolean isReadyForFileTransfer() {
+ return readyForFileTransfer;
+ }
+
+ public boolean isPortInvalid() {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not completed successfully");
+ }
+ return handshakeResponse.getCode() == ResponseCode.PORT_NOT_IN_VALID_STATE;
+ }
+
+ public boolean isPortUnknown() {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not completed successfully");
+ }
+ return handshakeResponse.getCode() == ResponseCode.UNKNOWN_PORT;
+ }
+
+ public boolean isDestinationFull() {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not completed successfully");
+ }
+ return handshakeResponse.getCode() == ResponseCode.PORTS_DESTINATION_FULL;
+ }
+
+ @Override
+ public Set<PeerStatus> getPeerStatuses(final Peer peer) throws IOException {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not been performed");
+ }
+
+ logger.debug("{} Get Peer Statuses from {}", this, peer);
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+
+ RequestType.REQUEST_PEER_LIST.writeRequestType(dos);
+ dos.flush();
+ final int numPeers = dis.readInt();
+ final Set<PeerStatus> peers = new HashSet<>(numPeers);
+ for (int i=0; i < numPeers; i++) {
+ final String hostname = dis.readUTF();
+ final int port = dis.readInt();
+ final boolean secure = dis.readBoolean();
+ final int flowFileCount = dis.readInt();
+ peers.add(new PeerStatus(hostname, port, secure, flowFileCount));
+ }
+
+ logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer);
+ return peers;
+ }
+
+ @Override
+ public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not been performed");
+ }
+
+ logger.debug("{} Negotiating Codec with {}", this, peer);
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+
+ RequestType.NEGOTIATE_FLOWFILE_CODEC.writeRequestType(dos);
+
+ FlowFileCodec codec = new StandardFlowFileCodec();
+ try {
+ codec = (FlowFileCodec) RemoteResourceInitiator.initiateResourceNegotiation(codec, dis, dos);
+ } catch (HandshakeException e) {
+ throw new ProtocolException(e.toString());
+ }
+ logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[] {this, codec, commsSession});
+
+ return codec;
+ }
+
+
+ @Override
+ public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not been performed");
+ }
+ if ( !readyForFileTransfer ) {
+ throw new IllegalStateException("Cannot receive files; handshake resolution was " + handshakeResponse);
+ }
+
+ logger.debug("{} Receiving FlowFiles from {}", this, peer);
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+ String userDn = commsSession.getUserDn();
+ if ( userDn == null ) {
+ userDn = "none";
+ }
+
+ // Indicate that we would like to have some data
+ RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
+ dos.flush();
+
+ // Determine if Peer will send us data or has no data to send us
+ final Response dataAvailableCode = Response.read(dis);
+ switch (dataAvailableCode.getCode()) {
+ case MORE_DATA:
+ logger.debug("{} {} Indicates that data is available", this, peer);
+ break;
+ case NO_MORE_DATA:
+ logger.debug("{} No data available from {}", peer);
+ return;
+ default:
+ throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+ }
+
+ final StopWatch stopWatch = new StopWatch(true);
+ final Set<FlowFile> flowFilesReceived = new HashSet<>();
+ long bytesReceived = 0L;
+ final CRC32 crc = new CRC32();
+
+ // Peer has data. Decode the bytes into FlowFiles until peer says he's finished sending data.
+ boolean continueTransaction = true;
+ String calculatedCRC = "";
+ while (continueTransaction) {
+ final InputStream flowFileInputStream = useCompression ? new CompressionInputStream(dis) : dis;
+ final CheckedInputStream checkedIn = new CheckedInputStream(flowFileInputStream, crc);
+
+ final long startNanos = System.nanoTime();
+ FlowFile flowFile = codec.decode(checkedIn, session);
+ final long transmissionNanos = System.nanoTime() - startNanos;
+ final long transmissionMillis = TimeUnit.MILLISECONDS.convert(transmissionNanos, TimeUnit.NANOSECONDS);
+
+ final String sourceFlowFileIdentifier = flowFile.getAttribute(CoreAttributes.UUID.key());
+ flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+
+ final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
+ session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transmissionMillis);
+
+ session.transfer(flowFile, Relationship.ANONYMOUS);
+ bytesReceived += flowFile.getSize();
+ flowFilesReceived.add(flowFile);
+ logger.debug("{} Received {} from {}", this, flowFile, peer);
+
+ final Response transactionCode = Response.read(dis);
+ switch (transactionCode.getCode()) {
+ case CONTINUE_TRANSACTION:
+ logger.trace("{} Received ContinueTransaction indicator from {}", this, peer);
+ break;
+ case FINISH_TRANSACTION:
+ logger.trace("{} Received FinishTransaction indicator from {}", this, peer);
+ continueTransaction = false;
+ calculatedCRC = String.valueOf(checkedIn.getChecksum().getValue());
+ break;
+ default:
+ throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionCode);
+ }
+ }
+
+ // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
+ // to peer so that we can verify that the connection is still open. This is a two-phase commit,
+ // which helps to prevent the chances of data duplication. Without doing this, we may commit the
+ // session and then when we send the response back to the peer, the peer may have timed out and may not
+ // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
+ // Critical Section involved in this transaction so that rather than the Critical Section being the
+ // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
+ logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
+ ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
+
+ final Response confirmTransactionResponse = Response.read(dis);
+ logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer);
+
+ switch (confirmTransactionResponse.getCode()) {
+ case CONFIRM_TRANSACTION:
+ break;
+ case BAD_CHECKSUM:
+ session.rollback();
+ throw new IOException(this + " Received a BadChecksum response from peer " + peer);
+ default:
+ throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
+ }
+
+ // Commit the session so that we have persisted the data
+ session.commit();
+
+ if ( context.getAvailableRelationships().isEmpty() ) {
+ // Confirm that we received the data and the peer can now discard it but that the peer should not
+ // send any more data for a bit
+ logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
+ ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
+ } else {
+ // Confirm that we received the data and the peer can now discard it
+ logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
+ ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
+ }
+
+ stopWatch.stop();
+ final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
+ final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
+ final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ final String dataSize = FormatUtils.formatDataSize(bytesReceived);
+ logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] {
+ this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+ }
+
+ @Override
+ public void transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not been performed");
+ }
+ if ( !readyForFileTransfer ) {
+ throw new IllegalStateException("Cannot transfer files; handshake resolution was " + handshakeResponse);
+ }
+
+ FlowFile flowFile = session.get();
+ if ( flowFile == null ) {
+ return;
+ }
+
+ logger.debug("{} Sending FlowFiles to {}", this, peer);
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+ String userDn = commsSession.getUserDn();
+ if ( userDn == null ) {
+ userDn = "none";
+ }
+
+ // Indicate that we would like to have some data
+ RequestType.SEND_FLOWFILES.writeRequestType(dos);
+ dos.flush();
+
+ final StopWatch stopWatch = new StopWatch(true);
+ final CRC32 crc = new CRC32();
+
+ long bytesSent = 0L;
+ final Set<FlowFile> flowFilesSent = new HashSet<>();
+ boolean continueTransaction = true;
+ String calculatedCRC = "";
+ final long startSendingNanos = System.nanoTime();
+ while (continueTransaction) {
+ final OutputStream flowFileOutputStream = useCompression ? new CompressionOutputStream(dos) : dos;
+ logger.debug("{} Sending {} to {}", this, flowFile, peer);
+
+ final CheckedOutputStream checkedOutStream = new CheckedOutputStream(flowFileOutputStream, crc);
+
+ final long startNanos = System.nanoTime();
+ flowFile = codec.encode(flowFile, session, checkedOutStream);
+ final long transferNanos = System.nanoTime() - startNanos;
+ final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+
+ // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
+ // Otherwise, do NOT close it because we don't want to close the underlying stream
+ // (CompressionOutputStream will not close the underlying stream when it's closed)
+ if ( useCompression ) {
+ checkedOutStream.close();
+ }
+
+ flowFilesSent.add(flowFile);
+ bytesSent += flowFile.getSize();
+ logger.debug("{} Sent {} to {}", this, flowFile, peer);
+
+ final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
+ session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false);
+ session.remove(flowFile);
+
+ final long sendingNanos = System.nanoTime() - startSendingNanos;
+ if ( sendingNanos < BATCH_SEND_NANOS ) {
+ flowFile = session.get();
+ } else {
+ flowFile = null;
+ }
+
+ continueTransaction = (flowFile != null);
+ if ( continueTransaction ) {
+ logger.debug("{} Sent CONTINUE_TRANSACTION indicator to {}", this, peer);
+ ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
+ } else {
+ logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer);
+ ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
+
+ calculatedCRC = String.valueOf( checkedOutStream.getChecksum().getValue() );
+ }
+ }
+
+ // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
+ final Response transactionConfirmationResponse = Response.read(dis);
+ if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
+ // Confirm checksum and echo back the confirmation.
+ logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer);
+ final String receivedCRC = transactionConfirmationResponse.getMessage();
+
+ if ( versionNegotiator.getVersion() > 3 ) {
+ if ( !receivedCRC.equals(calculatedCRC) ) {
+ ResponseCode.BAD_CHECKSUM.writeResponse(dos);
+ session.rollback();
+ throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
+ }
+ }
+
+ ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
+ } else {
+ throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
+ }
+
+ final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+
+ final Response transactionResponse;
+ try {
+ transactionResponse = Response.read(dis);
+ } catch (final IOException e) {
+ logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator." +
+ " It is unknown whether or not the peer successfully received/processed the data." +
+ " Therefore, {} will be rolled back, possibly resulting in data duplication of {}",
+ this, peer, session, flowFileDescription);
+ session.rollback();
+ throw e;
+ }
+
+ logger.debug("{} Received {} from {}", this, transactionResponse, peer);
+ if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
+ peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS));
+ } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
+ throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
+ }
+
+ // consume input stream entirely, ignoring its contents. If we
+ // don't do this, the Connection will not be returned to the pool
+ stopWatch.stop();
+ final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
+ final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ final String dataSize = FormatUtils.formatDataSize(bytesSent);
+
+ session.commit();
+
+ logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
+ this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+ }
+
+ @Override
+ public VersionNegotiator getVersionNegotiator() {
+ return versionNegotiator;
+ }
+
+ @Override
+ public void shutdown(final Peer peer) throws IOException {
+ readyForFileTransfer = false;
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+
+ logger.debug("{} Shutting down with {}", this, peer);
+ // Indicate that we would like to have some data
+ RequestType.SHUTDOWN.writeRequestType(dos);
+ dos.flush();
+ }
+
+ @Override
+ public String getResourceName() {
+ return "SocketFlowFileProtocol";
+ }
+
+ @Override
+ public String toString() {
+ return "SocketClientProtocol[CommsID=" + commsIdentifier + "]";
+ }
+}