You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/22 02:04:43 UTC
incubator-nifi git commit: NIFI-282: Refactoring to extract client
util
Repository: incubator-nifi
Updated Branches:
refs/heads/site-to-site-client 2aaed7021 -> 77fd8e5ec
NIFI-282: Refactoring to extract client util
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/77fd8e5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/77fd8e5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/77fd8e5e
Branch: refs/heads/site-to-site-client
Commit: 77fd8e5ec7e86095d6235deae91939db6412eeb1
Parents: 2aaed70
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jan 21 20:04:36 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jan 21 20:04:36 2015 -0500
----------------------------------------------------------------------
.../nifi/stream/io/LimitingInputStream.java | 111 ++++
.../stream/io/MinimumLengthInputStream.java | 93 +++
.../org/apache/nifi/remote/Transaction.java | 44 ++
.../nifi/remote/client/SiteToSiteClient.java | 7 +-
.../nifi/remote/client/socket/SocketClient.java | 54 +-
.../remote/codec/StandardFlowFileCodec.java | 78 +--
.../remote/exception/ProtocolException.java | 4 +-
.../nifi/remote/protocol/ClientProtocol.java | 13 +-
.../protocol/socket/SocketClientProtocol.java | 618 +++++--------------
.../socket/SocketClientTransaction.java | 260 +++++++-
.../nifi/remote/util/StandardDataPacket.java | 50 ++
.../nifi/remote/StandardRemoteGroupPort.java | 2 +-
.../socket/SocketFlowFileServerProtocol.java | 20 +-
13 files changed, 758 insertions(+), 596 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
new file mode 100644
index 0000000..421d579
--- /dev/null
+++ b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class LimitingInputStream extends InputStream {
+
+ private final InputStream in;
+ private final long limit;
+ private long bytesRead = 0;
+
+ public LimitingInputStream(final InputStream in, final long limit) {
+ this.in = in;
+ this.limit = limit;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (bytesRead >= limit) {
+ return -1;
+ }
+
+ final int val = in.read();
+ if (val > -1) {
+ bytesRead++;
+ }
+ return val;
+ }
+
+ @Override
+ public int read(final byte[] b) throws IOException {
+ if (bytesRead >= limit) {
+ return -1;
+ }
+
+ final int maxToRead = (int) Math.min(b.length, limit - bytesRead);
+
+ final int val = in.read(b, 0, maxToRead);
+ if (val > 0) {
+ bytesRead += val;
+ }
+ return val;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (bytesRead >= limit) {
+ return -1;
+ }
+
+ final int maxToRead = (int) Math.min(len, limit - bytesRead);
+
+ final int val = in.read(b, off, maxToRead);
+ if (val > 0) {
+ bytesRead += val;
+ }
+ return val;
+ }
+
+ @Override
+ public long skip(final long n) throws IOException {
+ final long skipped = in.skip(Math.min(n, limit - bytesRead));
+ bytesRead += skipped;
+ return skipped;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return in.available();
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ @Override
+ public void mark(int readlimit) {
+ in.mark(readlimit);
+ }
+
+ @Override
+ public boolean markSupported() {
+ return in.markSupported();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ in.reset();
+ }
+
+ public long getLimit() {
+ return limit;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java
----------------------------------------------------------------------
diff --git a/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java
new file mode 100644
index 0000000..2e93599
--- /dev/null
+++ b/nifi/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/MinimumLengthInputStream.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.EOFException;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An InputStream that will throw EOFException if the underlying InputStream runs out of data before reaching the
+ * configured minimum amount of data
+ */
+public class MinimumLengthInputStream extends FilterInputStream {
+
+ private final long minLength;
+ private long consumedCount = 0L;
+
+ public MinimumLengthInputStream(final InputStream in, final long minLength) {
+ super(in);
+ this.minLength = minLength;
+ }
+
+
+ @Override
+ public int read() throws IOException {
+ final int b = super.read();
+ if ( b < 0 && consumedCount < minLength ) {
+ throw new EOFException();
+ }
+
+ if ( b >= 0 ) {
+ consumedCount++;
+ }
+
+ return b;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException {
+ final int num = super.read(b, off, len);
+
+ if ( num < 0 && consumedCount < minLength ) {
+ throw new EOFException();
+ }
+
+ if ( num >= 0 ) {
+ consumedCount += num;
+ }
+
+ return num;
+ }
+
+ @Override
+ public long skip(final long n) throws IOException {
+ long skipped = super.skip(n);
+ if ( skipped < 1 ) {
+ final int b = super.read();
+ if ( b >= 0 ) {
+ skipped = 1;
+ }
+ }
+
+ if ( skipped < 0 && consumedCount < minLength ) {
+ throw new EOFException();
+ }
+
+ if ( skipped >= 0 ) {
+ consumedCount += skipped;
+ }
+
+ return skipped;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
new file mode 100644
index 0000000..6c136fc
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.IOException;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+
+public interface Transaction {
+
+ void confirm() throws IOException;
+
+ void complete(boolean applyBackpressure) throws IOException;
+
+ void cancel() throws IOException;
+
+ void send(DataPacket dataPacket) throws IOException;
+
+ DataPacket receive() throws IOException;
+
+ TransactionState getState() throws IOException;
+
+ public enum TransactionState {
+ TRANSACTION_STARTED,
+ DATA_EXCHANGED,
+ TRANSACTION_CONFIRMED,
+ TRANSACTION_COMPLETED,
+ TRANSACTION_CANCELED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 34cb56a..164a63c 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -19,12 +19,11 @@ package org.apache.nifi.remote.client;
import java.io.Closeable;
import java.io.IOException;
-import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransferDirection;
public interface SiteToSiteClient extends Closeable {
- void send(DataPacket dataPacket) throws IOException;
-
- DataPacket receive() throws IOException;
+ Transaction createTransaction(TransferDirection direction) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index b81b425..88eb5e8 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -24,6 +24,7 @@ import javax.net.ssl.SSLContext;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.RemoteDestination;
+import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.exception.HandshakeException;
@@ -65,7 +66,7 @@ public class SocketClient implements SiteToSiteClient {
@Override
- public void send(final DataPacket dataPacket) throws IOException {
+ public Transaction createTransaction(final TransferDirection direction) throws IOException {
final String portId = getPortIdentifier(TransferDirection.SEND);
if ( portId == null ) {
@@ -91,19 +92,58 @@ public class SocketClient implements SiteToSiteClient {
final EndpointConnectionState connectionState;
try {
- connectionState = pool.getEndpointConnectionState(remoteDestination, TransferDirection.SEND);
+ connectionState = pool.getEndpointConnectionState(remoteDestination, direction);
} catch (final ProtocolException | HandshakeException | PortNotRunningException | UnknownPortException e) {
throw new IOException(e);
}
+ final Transaction transaction = connectionState.getSocketClientProtocol().startTransaction(
+ connectionState.getPeer(), connectionState.getCodec(), direction);
- }
+ // Wrap the transaction in a new one that will return the EndpointConnectionState back to the pool whenever
+ // the transaction is either completed or canceled.
+ return new Transaction() {
+ @Override
+ public void confirm() throws IOException {
+ transaction.confirm();
+ }
- @Override
- public DataPacket receive() throws IOException {
- // TODO Auto-generated method stub
- return null;
+ @Override
+ public void complete(final boolean applyBackpressure) throws IOException {
+ try {
+ transaction.complete(applyBackpressure);
+ } finally {
+ pool.offer(connectionState);
+ }
+ }
+
+ @Override
+ public void cancel() throws IOException {
+ try {
+ transaction.cancel();
+ } finally {
+ pool.offer(connectionState);
+ }
+ }
+
+ @Override
+ public void send(final DataPacket dataPacket) throws IOException {
+ transaction.send(dataPacket);
+ }
+
+ @Override
+ public DataPacket receive() throws IOException {
+ return transaction.receive();
+ }
+
+ @Override
+ public TransactionState getState() throws IOException {
+ return transaction.getState();
+ }
+
+ };
}
+
@Override
public void close() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
index d18a4ee..6fd92de 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java
@@ -26,14 +26,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.util.StandardDataPacket;
+import org.apache.nifi.stream.io.StreamUtils;
public class StandardFlowFileCodec implements FlowFileCodec {
public static final int MAX_NUM_ATTRIBUTES = 25000;
@@ -47,37 +45,26 @@ public class StandardFlowFileCodec implements FlowFileCodec {
}
@Override
- public FlowFile encode(final FlowFile flowFile, final ProcessSession session, final OutputStream encodedOut) throws IOException {
+ public void encode(final DataPacket dataPacket, final OutputStream encodedOut) throws IOException {
final DataOutputStream out = new DataOutputStream(encodedOut);
- final Map<String, String> attributes = flowFile.getAttributes();
+ final Map<String, String> attributes = dataPacket.getAttributes();
out.writeInt(attributes.size());
for ( final Map.Entry<String, String> entry : attributes.entrySet() ) {
writeString(entry.getKey(), out);
writeString(entry.getValue(), out);
}
- out.writeLong(flowFile.getSize());
-
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- final byte[] buffer = new byte[8192];
- int len;
- while ( (len = in.read(buffer)) > 0 ) {
- encodedOut.write(buffer, 0, len);
- }
-
- encodedOut.flush();
- }
- });
+ out.writeLong(dataPacket.getSize());
- return flowFile;
+ final InputStream in = dataPacket.getData();
+ StreamUtils.copy(in, encodedOut);
+ encodedOut.flush();
}
@Override
- public FlowFile decode(final InputStream stream, final ProcessSession session) throws IOException, ProtocolException {
+ public DataPacket decode(final InputStream stream) throws IOException, ProtocolException {
final DataInputStream in = new DataInputStream(stream);
final int numAttributes;
@@ -94,43 +81,16 @@ public class StandardFlowFileCodec implements FlowFileCodec {
throw new ProtocolException("FlowFile exceeds maximum number of attributes with a total of " + numAttributes);
}
- try {
- final Map<String, String> attributes = new HashMap<>(numAttributes);
- for (int i=0; i < numAttributes; i++) {
- final String attrName = readString(in);
- final String attrValue = readString(in);
- attributes.put(attrName, attrValue);
- }
-
- final long numBytes = in.readLong();
-
- FlowFile flowFile = session.create();
- flowFile = session.putAllAttributes(flowFile, attributes);
- flowFile = session.write(flowFile, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws IOException {
- int len;
- long size = 0;
- final byte[] buffer = new byte[8192];
-
- while ( size < numBytes && (len = in.read(buffer, 0, (int) Math.min(buffer.length, numBytes - size))) > 0 ) {
- out.write(buffer, 0, len);
- size += len;
- }
-
- if ( size != numBytes ) {
- throw new EOFException("Expected " + numBytes + " bytes but received only " + size);
- }
- }
- });
-
- return flowFile;
- } catch (final EOFException e) {
- session.rollback();
-
- // we throw the general IOException here because we did not expect to hit EOFException
- throw e;
+ final Map<String, String> attributes = new HashMap<>(numAttributes);
+ for (int i=0; i < numAttributes; i++) {
+ final String attrName = readString(in);
+ final String attrValue = readString(in);
+ attributes.put(attrName, attrValue);
}
+
+ final long numBytes = in.readLong();
+
+ return new StandardDataPacket(attributes, stream, numBytes);
}
private void writeString(final String val, final DataOutputStream out) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
index 0f50b98..e12348a 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java
@@ -16,7 +16,9 @@
*/
package org.apache.nifi.remote.exception;
-public class ProtocolException extends Exception {
+import java.io.IOException;
+
+public class ProtocolException extends IOException {
private static final long serialVersionUID = 5763900324505818495L;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
index 51d3970..befbdaa 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java
@@ -23,6 +23,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerStatus;
+import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.VersionedRemoteResource;
import org.apache.nifi.remote.codec.FlowFileCodec;
@@ -50,17 +51,7 @@ public interface ClientProtocol extends VersionedRemoteResource {
- void startTransaction(Peer peer, TransferDirection direction) throws IOException, ProtocolException;
-
- void completeTransaction(boolean applyBackPressure) throws IOException, ProtocolException;
-
- void rollbackTransaction();
-
- // must be done within a transaction.
- void transferData(DataPacket dataPacket, FlowFileCodec codec) throws IOException, ProtocolException;
-
- // must be done within a transaction.
- DataPacket receiveData(FlowFileCodec codec) throws IOException, ProtocolException;
+ Transaction startTransaction(Peer peer, FlowFileCodec codec, TransferDirection direction) throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 58d26d4..b4d1e5d 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -20,16 +20,12 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import java.util.zip.CRC32;
-import java.util.zip.CheckedInputStream;
-import java.util.zip.CheckedOutputStream;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -42,18 +38,18 @@ import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.RemoteResourceInitiator;
import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.codec.StandardFlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.io.CompressionInputStream;
-import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.protocol.ClientProtocol;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.RequestType;
+import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger;
@@ -74,6 +70,8 @@ public class SocketClientProtocol implements ClientProtocol {
private boolean readyForFileTransfer = false;
private String transitUriPrefix = null;
private int timeoutMillis = 30000;
+
+ private SocketClientTransaction transaction;
private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
@@ -236,11 +234,8 @@ public class SocketClientProtocol implements ClientProtocol {
}
- // TODO: move up to top with member variables
- private SocketClientTransaction transaction;
-
@Override
- public void startTransaction(final Peer peer, final TransferDirection direction) throws IOException, ProtocolException {
+ public Transaction startTransaction(final Peer peer, final FlowFileCodec codec, final TransferDirection direction) throws IOException, ProtocolException {
if ( !handshakeComplete ) {
throw new IllegalStateException("Handshake has not been performed");
}
@@ -248,204 +243,29 @@ public class SocketClientProtocol implements ClientProtocol {
throw new IllegalStateException("Cannot start transaction; handshake resolution was " + handshakeResponse);
}
- transaction = new SocketClientTransaction(peer, direction, useCompression);
-
- final DataOutputStream dos = transaction.getDataOutputStream();
- if ( direction == TransferDirection.RECEIVE ) {
- // Indicate that we would like to have some data
- RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
- dos.flush();
-
- final Response dataAvailableCode = Response.read(transaction.getDataInputStream());
- switch (dataAvailableCode.getCode()) {
- case MORE_DATA:
- logger.debug("{} {} Indicates that data is available", this, peer);
- transaction.setDataAvailable(true);
- break;
- case NO_MORE_DATA:
- logger.debug("{} No data available from {}", peer);
- transaction.setDataAvailable(false);
- return;
- default:
- throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
- }
-
- } else {
- // Indicate that we would like to have some data
- RequestType.SEND_FLOWFILES.writeRequestType(dos);
- dos.flush();
- }
+ return new SocketClientTransaction(versionNegotiator.getVersion(), peer, codec,
+ direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS));
}
-
- @Override
- public DataPacket receiveData(final FlowFileCodec codec) throws IOException, ProtocolException {
- if ( transaction == null ) {
- throw new IllegalStateException("Cannot receive data because no transaction has been started");
- }
-
- if ( transaction.getTransferDirection() == TransferDirection.SEND ) {
- throw new IllegalStateException("Attempting to receive data but started a SEND Transaction");
- }
- // if no data available, return null
- if ( !transaction.isDataAvailable() ) {
- return null;
- }
-
- final Peer peer = transaction.getPeer();
- logger.debug("{} Receiving data from {}", this, peer);
- final DataPacket packet = codec.decode(transaction.createCheckedInputStream());
-
- if ( packet != null ) {
- transaction.incrementTransferCount();
-
- // Determine if Peer will send us data or has no data to send us
- final DataInputStream dis = transaction.getDataInputStream();
- final Response dataAvailableCode = Response.read(dis);
- switch (dataAvailableCode.getCode()) {
- case MORE_DATA:
- logger.debug("{} {} Indicates that data is available", this, peer);
- transaction.setDataAvailable(true);
- break;
- case NO_MORE_DATA:
- logger.debug("{} No data available from {}", peer);
- transaction.setDataAvailable(false);
- break;
- default:
- throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
- }
- }
-
- return packet;
- }
- @Override
- public void transferData(final DataPacket dataPacket, final FlowFileCodec codec) throws IOException, ProtocolException {
- if ( transaction == null ) {
- throw new IllegalStateException("Cannot send data because no transaction has been started");
- }
-
- if ( transaction.getTransferDirection() == TransferDirection.RECEIVE ) {
- throw new IllegalStateException("Attempting to send data but started a RECEIVE Transaction");
- }
-
- final Peer peer = transaction.getPeer();
- logger.debug("{} Sending data to {}", this, peer);
-
- if ( transaction.getTransferCount() > 0 ) {
- ResponseCode.CONTINUE_TRANSACTION.writeResponse(transaction.getDataOutputStream());
- }
+ // TODO: Transaction should be pulled out into its own class.
+ // Flow of execution:
+ // - start transaction
+ // - send/receive data
+ // - confirm contents
+ // - complete / rollback
+ //
+ // - this class should validate transaction state before each step.
+ // We need to confirm transaction to ensure that data is correct. Yes, it is sent via TCP, which should ensure that the
+ // data is correct, but things happen. Humans make mistakes. There could easily be a bug on our end, for example. And this
+ // will ensure that we guard against that. It's a good defensive programming strategy.
+ public void confirmTransaction() throws IOException {
- final CheckedOutputStream checkedOutStream = transaction.createCheckedOutputStream();
- codec.encode(dataPacket, checkedOutStream);
-
- // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
- // Otherwise, do NOT close it because we don't want to close the underlying stream
- // (CompressionOutputStream will not close the underlying stream when it's closed)
- if ( useCompression ) {
- checkedOutStream.close();
- }
-
- transaction.incrementTransferCount();
}
- @Override
- public void completeTransaction(final boolean applyBackPressure) throws ProtocolException, IOException {
- final SocketClientTransaction transaction = this.transaction;
- this.transaction = null;
-
- if ( transaction == null ) {
- throw new IllegalStateException("Cannot complete transaction because no transaction has been started");
- }
-
- final Peer peer = transaction.getPeer();
-
- if ( transaction.getTransferDirection() == TransferDirection.RECEIVE ) {
- final boolean moreData = transaction.isDataAvailable();
- if ( moreData ) {
- throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed.");
- }
-
- // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
- // to peer so that we can verify that the connection is still open. This is a two-phase commit,
- // which helps to prevent the chances of data duplication. Without doing this, we may commit the
- // session and then when we send the response back to the peer, the peer may have timed out and may not
- // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
- // Critical Section involved in this transaction so that rather than the Critical Section being the
- // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
- logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
- final String calculatedCRC = transaction.calculateCRC();
- ResponseCode.CONFIRM_TRANSACTION.writeResponse(transaction.getDataOutputStream(), calculatedCRC);
-
- final Response confirmTransactionResponse = Response.read(transaction.getDataInputStream());
- logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer);
-
- switch (confirmTransactionResponse.getCode()) {
- case CONFIRM_TRANSACTION:
- break;
- case BAD_CHECKSUM:
- throw new IOException(this + " Received a BadChecksum response from peer " + peer);
- default:
- throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
- }
-
- if ( applyBackPressure ) {
- // Confirm that we received the data and the peer can now discard it but that the peer should not
- // send any more data for a bit
- logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
- ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(transaction.getDataOutputStream());
- } else {
- // Confirm that we received the data and the peer can now discard it
- logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
- ResponseCode.TRANSACTION_FINISHED.writeResponse(transaction.getDataOutputStream());
- }
- } else {
- logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer);
- ResponseCode.FINISH_TRANSACTION.writeResponse(transaction.getDataOutputStream());
-
- final String calculatedCRC = transaction.calculateCRC();
-
- // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
- final Response transactionConfirmationResponse = Response.read(transaction.getDataInputStream());
- if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
- // Confirm checksum and echo back the confirmation.
- logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer);
- final String receivedCRC = transactionConfirmationResponse.getMessage();
-
- if ( versionNegotiator.getVersion() > 3 ) {
- if ( !receivedCRC.equals(calculatedCRC) ) {
- ResponseCode.BAD_CHECKSUM.writeResponse(transaction.getDataOutputStream());
- throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
- }
- }
-
- ResponseCode.CONFIRM_TRANSACTION.writeResponse(transaction.getDataOutputStream(), "");
- } else {
- throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
- }
-
- final Response transactionResponse;
- try {
- transactionResponse = Response.read(transaction.getDataInputStream());
- } catch (final IOException e) {
- throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " +
- "It is unknown whether or not the peer successfully received/processed the data.", e);
- }
-
- logger.debug("{} Received {} from {}", this, transactionResponse, peer);
- if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
- peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS));
- } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
- throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
- }
- }
- }
-
-
- @Override
- public void rollbackTransaction() {
+ public void cancelTransaction() {
final SocketClientTransaction transaction = this.transaction;
this.transaction = null;
@@ -456,296 +276,134 @@ public class SocketClientProtocol implements ClientProtocol {
// TODO: IMPLEMENT
}
+
@Override
public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
- if ( !handshakeComplete ) {
- throw new IllegalStateException("Handshake has not been performed");
- }
- if ( !readyForFileTransfer ) {
- throw new IllegalStateException("Cannot receive files; handshake resolution was " + handshakeResponse);
- }
-
- logger.debug("{} Receiving FlowFiles from {}", this, peer);
- final CommunicationsSession commsSession = peer.getCommunicationsSession();
- final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
- final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
- String userDn = commsSession.getUserDn();
- if ( userDn == null ) {
- userDn = "none";
- }
-
- // Indicate that we would like to have some data
- RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
- dos.flush();
-
- // Determine if Peer will send us data or has no data to send us
- final Response dataAvailableCode = Response.read(dis);
- switch (dataAvailableCode.getCode()) {
- case MORE_DATA:
- logger.debug("{} {} Indicates that data is available", this, peer);
- break;
- case NO_MORE_DATA:
- context.yield();
- logger.debug("{} No data available from {}", peer);
- return;
- default:
- throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
- }
-
- final StopWatch stopWatch = new StopWatch(true);
- final Set<FlowFile> flowFilesReceived = new HashSet<>();
- long bytesReceived = 0L;
- final CRC32 crc = new CRC32();
-
- // Peer has data. Decode the bytes into FlowFiles until peer says he's finished sending data.
- boolean continueTransaction = true;
- String calculatedCRC = "";
- while (continueTransaction) {
- final InputStream flowFileInputStream = useCompression ? new CompressionInputStream(dis) : dis;
- final CheckedInputStream checkedIn = new CheckedInputStream(flowFileInputStream, crc);
-
- final long startNanos = System.nanoTime();
-
- final DataPacket dataPacket = codec.decode(checkedIn);
- FlowFile flowFile = session.create();
- flowFile = session.importFrom(dataPacket.getData(), flowFile);
- flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
-
- final long transmissionNanos = System.nanoTime() - startNanos;
- final long transmissionMillis = TimeUnit.MILLISECONDS.convert(transmissionNanos, TimeUnit.NANOSECONDS);
-
- final String sourceFlowFileIdentifier = flowFile.getAttribute(CoreAttributes.UUID.key());
- flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString());
-
- final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
- session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transmissionMillis);
-
- session.transfer(flowFile, Relationship.ANONYMOUS);
- bytesReceived += flowFile.getSize();
- flowFilesReceived.add(flowFile);
- logger.debug("{} Received {} from {}", this, flowFile, peer);
-
- final Response transactionCode = Response.read(dis);
- switch (transactionCode.getCode()) {
- case CONTINUE_TRANSACTION:
- logger.trace("{} Received ContinueTransaction indicator from {}", this, peer);
- break;
- case FINISH_TRANSACTION:
- logger.trace("{} Received FinishTransaction indicator from {}", this, peer);
- continueTransaction = false;
- calculatedCRC = String.valueOf(checkedIn.getChecksum().getValue());
- break;
- default:
- throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionCode);
- }
- }
-
- // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
- // to peer so that we can verify that the connection is still open. This is a two-phase commit,
- // which helps to prevent the chances of data duplication. Without doing this, we may commit the
- // session and then when we send the response back to the peer, the peer may have timed out and may not
- // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
- // Critical Section involved in this transaction so that rather than the Critical Section being the
- // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
- logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
- ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
-
- final Response confirmTransactionResponse = Response.read(dis);
- logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer);
-
- switch (confirmTransactionResponse.getCode()) {
- case CONFIRM_TRANSACTION:
- break;
- case BAD_CHECKSUM:
- session.rollback();
- throw new IOException(this + " Received a BadChecksum response from peer " + peer);
- default:
- throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
- }
-
- // Commit the session so that we have persisted the data
- session.commit();
-
- if ( context.getAvailableRelationships().isEmpty() ) {
- // Confirm that we received the data and the peer can now discard it but that the peer should not
- // send any more data for a bit
- logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
- ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
- } else {
- // Confirm that we received the data and the peer can now discard it
- logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
- ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
- }
-
- stopWatch.stop();
- final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
- final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
- final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
- final String dataSize = FormatUtils.formatDataSize(bytesReceived);
- logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] {
- this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
- }
+ final String userDn = peer.getCommunicationsSession().getUserDn();
+ final Transaction transaction = startTransaction(peer, codec, TransferDirection.RECEIVE);
+
+ final StopWatch stopWatch = new StopWatch(true);
+ final Set<FlowFile> flowFilesReceived = new HashSet<>();
+ long bytesReceived = 0L;
+
+ while (true) {
+ final long start = System.nanoTime();
+ final DataPacket dataPacket = transaction.receive();
+ if ( dataPacket == null ) {
+ break;
+ }
+
+ FlowFile flowFile = session.create();
+ flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
+ flowFile = session.importFrom(dataPacket.getData(), flowFile);
+ final long receiveNanos = System.nanoTime() - start;
+
+ String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
+ if ( sourceFlowFileIdentifier == null ) {
+ sourceFlowFileIdentifier = "<Unknown Identifier>";
+ }
+
+ final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
+ session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos));
- @Override
- public void transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
- if ( !handshakeComplete ) {
- throw new IllegalStateException("Handshake has not been performed");
- }
- if ( !readyForFileTransfer ) {
- throw new IllegalStateException("Cannot transfer files; handshake resolution was " + handshakeResponse);
- }
+ session.transfer(flowFile, Relationship.ANONYMOUS);
+ bytesReceived += dataPacket.getSize();
+ }
- FlowFile flowFile = session.get();
- if ( flowFile == null ) {
- return;
- }
+ // Confirm that what we received was the correct data.
+ transaction.confirm();
+
+ // Commit the session so that we have persisted the data
+ session.commit();
- logger.debug("{} Sending FlowFiles to {}", this, peer);
- final CommunicationsSession commsSession = peer.getCommunicationsSession();
- final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
- final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
- String userDn = commsSession.getUserDn();
- if ( userDn == null ) {
- userDn = "none";
- }
-
- // Indicate that we would like to have some data
- RequestType.SEND_FLOWFILES.writeRequestType(dos);
- dos.flush();
-
- final StopWatch stopWatch = new StopWatch(true);
- final CRC32 crc = new CRC32();
-
- long bytesSent = 0L;
- final Set<FlowFile> flowFilesSent = new HashSet<>();
- boolean continueTransaction = true;
- String calculatedCRC = "";
- final long startSendingNanos = System.nanoTime();
- while (continueTransaction) {
- final OutputStream flowFileOutputStream = useCompression ? new CompressionOutputStream(dos) : dos;
- logger.debug("{} Sending {} to {}", this, flowFile, peer);
-
- final CheckedOutputStream checkedOutStream = new CheckedOutputStream(flowFileOutputStream, crc);
-
- final long startNanos = System.nanoTime();
-
- // call codec.encode within a session callback so that we have the InputStream to read the FlowFile
- final FlowFile toWrap = flowFile;
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- final DataPacket dataPacket = new DataPacket() {
- @Override
- public Map<String, String> getAttributes() {
- return toWrap.getAttributes();
- }
+ // We want to apply backpressure if the outgoing connections are full. I.e., there are no available relationships.
+ final boolean applyBackpressure = context.getAvailableRelationships().isEmpty();
- @Override
- public InputStream getData() {
- return in;
- }
+ transaction.complete(applyBackpressure);
+ logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
- @Override
- public long getSize() {
- return toWrap.getSize();
- }
- };
-
- codec.encode(dataPacket, checkedOutStream);
- }
- });
-
- final long transferNanos = System.nanoTime() - startNanos;
- final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
-
- // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
- // Otherwise, do NOT close it because we don't want to close the underlying stream
- // (CompressionOutputStream will not close the underlying stream when it's closed)
- if ( useCompression ) {
- checkedOutStream.close();
- }
-
- flowFilesSent.add(flowFile);
- bytesSent += flowFile.getSize();
- logger.debug("{} Sent {} to {}", this, flowFile, peer);
-
- final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
- session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false);
- session.remove(flowFile);
-
- final long sendingNanos = System.nanoTime() - startSendingNanos;
- if ( sendingNanos < BATCH_SEND_NANOS ) {
- flowFile = session.get();
- } else {
- flowFile = null;
- }
-
- continueTransaction = (flowFile != null);
- if ( continueTransaction ) {
- logger.debug("{} Sent CONTINUE_TRANSACTION indicator to {}", this, peer);
- ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
- } else {
- logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer);
- ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
-
- calculatedCRC = String.valueOf( checkedOutStream.getChecksum().getValue() );
- }
- }
-
- // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
- final Response transactionConfirmationResponse = Response.read(dis);
- if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
- // Confirm checksum and echo back the confirmation.
- logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer);
- final String receivedCRC = transactionConfirmationResponse.getMessage();
-
- if ( versionNegotiator.getVersion() > 3 ) {
- if ( !receivedCRC.equals(calculatedCRC) ) {
- ResponseCode.BAD_CHECKSUM.writeResponse(dos);
- session.rollback();
- throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
- }
- }
-
- ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
- } else {
- throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
- }
+ stopWatch.stop();
+ final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
+ final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
+ final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ final String dataSize = FormatUtils.formatDataSize(bytesReceived);
+ logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] {
+ this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate });
+ }
- final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+
+ @Override
+ public void transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
- final Response transactionResponse;
- try {
- transactionResponse = Response.read(dis);
- } catch (final IOException e) {
- logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator." +
- " It is unknown whether or not the peer successfully received/processed the data." +
- " Therefore, {} will be rolled back, possibly resulting in data duplication of {}",
- this, peer, session, flowFileDescription);
- session.rollback();
- throw e;
- }
-
- logger.debug("{} Received {} from {}", this, transactionResponse, peer);
- if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
- peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS));
- } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
- throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
- }
-
- // consume input stream entirely, ignoring its contents. If we
- // don't do this, the Connection will not be returned to the pool
- stopWatch.stop();
- final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
- final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
- final String dataSize = FormatUtils.formatDataSize(bytesSent);
-
- session.commit();
-
- logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
- this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+ try {
+ final String userDn = peer.getCommunicationsSession().getUserDn();
+ final long startSendingNanos = System.nanoTime();
+ final StopWatch stopWatch = new StopWatch(true);
+ long bytesSent = 0L;
+
+ final Transaction transaction = startTransaction(peer, codec, TransferDirection.SEND);
+
+ final Set<FlowFile> flowFilesSent = new HashSet<>();
+ boolean continueTransaction = true;
+ while (continueTransaction) {
+ final long startNanos = System.nanoTime();
+ // call codec.encode within a session callback so that we have the InputStream to read the FlowFile
+ final FlowFile toWrap = flowFile;
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ final DataPacket dataPacket = new StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize());
+ transaction.send(dataPacket);
+ }
+ });
+
+ final long transferNanos = System.nanoTime() - startNanos;
+ final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+
+ flowFilesSent.add(flowFile);
+ bytesSent += flowFile.getSize();
+ logger.debug("{} Sent {} to {}", this, flowFile, peer);
+
+ final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
+ session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false);
+ session.remove(flowFile);
+
+ final long sendingNanos = System.nanoTime() - startSendingNanos;
+ if ( sendingNanos < BATCH_SEND_NANOS ) {
+ flowFile = session.get();
+ } else {
+ flowFile = null;
+ }
+
+ continueTransaction = (flowFile != null);
+ }
+
+ transaction.confirm();
+
+ // consume input stream entirely, ignoring its contents. If we
+ // don't do this, the Connection will not be returned to the pool
+ stopWatch.stop();
+ final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
+ final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ final String dataSize = FormatUtils.formatDataSize(bytesSent);
+
+ session.commit();
+ transaction.complete(false);
+
+ final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+ logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
+ this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+ } catch (final Exception e) {
+ session.rollback();
+ throw e;
+ }
}
-
+
+
@Override
public VersionNegotiator getVersionNegotiator() {
return versionNegotiator;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index 83522a5..129e5aa 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -19,74 +19,272 @@ package org.apache.nifi.remote.protocol.socket;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream;
import org.apache.nifi.remote.Peer;
+import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.remote.exception.ProtocolException;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.protocol.RequestType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class SocketClientTransaction {
- private final long startTime = System.nanoTime();
- private final CRC32 crc = new CRC32();
+public class SocketClientTransaction implements Transaction {
+ private static final Logger logger = LoggerFactory.getLogger(SocketClientTransaction.class);
- private final Peer peer;
+ private final CRC32 crc = new CRC32();
+ private final int protocolVersion;
+ private final FlowFileCodec codec;
private final DataInputStream dis;
private final DataOutputStream dos;
private final TransferDirection direction;
+ private final boolean compress;
+ private final Peer peer;
+ private final int penaltyMillis;
private boolean dataAvailable = false;
private int transfers = 0;
+ private TransactionState state;
- SocketClientTransaction(final Peer peer, final TransferDirection direction, final boolean useCompression) throws IOException {
+ SocketClientTransaction(final int protocolVersion, final Peer peer, final FlowFileCodec codec,
+ final TransferDirection direction, final boolean useCompression, final int penaltyMillis) throws IOException {
+ this.protocolVersion = protocolVersion;
this.peer = peer;
+ this.codec = codec;
this.direction = direction;
this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream());
this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
+ this.compress = useCompression;
+ this.state = TransactionState.TRANSACTION_STARTED;
+ this.penaltyMillis = penaltyMillis;
+
+ initialize();
}
- int getTransferCount() {
- return transfers;
+ // TODO: UPDATE STATE
+ private void initialize() throws IOException {
+ if ( direction == TransferDirection.RECEIVE ) {
+ // Indicate that we would like to have some data
+ RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
+ dos.flush();
+
+ final Response dataAvailableCode = Response.read(dis);
+ switch (dataAvailableCode.getCode()) {
+ case MORE_DATA:
+ logger.debug("{} {} Indicates that data is available", this, peer);
+ this.dataAvailable = true;
+ break;
+ case NO_MORE_DATA:
+ logger.debug("{} No data available from {}", peer);
+ this.dataAvailable = false;
+ return;
+ default:
+ throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+ }
+
+ } else {
+ // Indicate that we would like to have some data
+ RequestType.SEND_FLOWFILES.writeRequestType(dos);
+ dos.flush();
+ }
}
- void incrementTransferCount() {
- transfers++;
- }
- void setDataAvailable(final boolean available) {
- this.dataAvailable = available;
+ // TODO: UPDATE STATE
+ @Override
+ public DataPacket receive() throws IOException {
+ if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
+ throw new IllegalStateException("Cannot receive data because Transaction State is " + state);
+ }
+
+ if ( direction == TransferDirection.SEND ) {
+ throw new IllegalStateException("Attempting to receive data but started a SEND Transaction");
+ }
+
+ // if no data available, return null
+ if ( !dataAvailable ) {
+ return null;
+ }
+
+ logger.debug("{} Receiving data from {}", this, peer);
+ final DataPacket packet = codec.decode(new CheckedInputStream(dis, crc));
+
+ if ( packet != null ) {
+ transfers++;
+
+ // Determine if Peer will send us data or has no data to send us
+ final Response dataAvailableCode = Response.read(dis);
+ switch (dataAvailableCode.getCode()) {
+ case MORE_DATA:
+ logger.debug("{} {} Indicates that data is available", this, peer);
+ this.dataAvailable = true;
+ break;
+ case NO_MORE_DATA:
+ logger.debug("{} No data available from {}", peer);
+ this.dataAvailable = false;
+ break;
+ default:
+ throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+ }
+ }
+
+ return packet;
}
- boolean isDataAvailable() {
- return dataAvailable;
- }
- TransferDirection getTransferDirection() {
- return direction;
+ // TODO: UPDATE STATE
+ @Override
+ public void send(DataPacket dataPacket) throws IOException {
+ if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
+ throw new IllegalStateException("Cannot send data because Transaction State is " + state);
+ }
+
+ if ( direction == TransferDirection.RECEIVE ) {
+ throw new IllegalStateException("Attempting to send data but started a RECEIVE Transaction");
+ }
+
+ if ( transfers > 0 ) {
+ ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
+ }
+
+ logger.debug("{} Sending data to {}", this, peer);
+
+ final OutputStream out = new CheckedOutputStream(dos, crc);
+ codec.encode(dataPacket, out);
+
+ // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
+ // Otherwise, do NOT close it because we don't want to close the underlying stream
+ // (CompressionOutputStream will not close the underlying stream when it's closed)
+ if ( compress ) {
+ out.close();
+ }
+
+ transfers++;
}
- DataOutputStream getDataOutputStream() {
- return dos;
- }
- DataInputStream getDataInputStream() {
- return dis;
+ // TODO: UPDATE STATE
+ @Override
+ public void cancel() {
+ if ( state == TransactionState.TRANSACTION_CANCELED || state == TransactionState.TRANSACTION_COMPLETED ) {
+ throw new IllegalStateException("Cannot cancel transaction because state is already " + state);
+ }
+
+ // TODO: implement
}
- CheckedInputStream createCheckedInputStream() {
- return new CheckedInputStream(dis, crc);
- }
- CheckedOutputStream createCheckedOutputStream() {
- return new CheckedOutputStream(dos, crc);
+ // TODO: UPDATE STATE
+ @Override
+ public void complete(boolean applyBackPressure) throws IOException {
+ if ( state != TransactionState.TRANSACTION_CONFIRMED ) {
+ throw new IllegalStateException("Cannot complete transaction because state is " + state +
+ "; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED);
+ }
+
+ if ( direction == TransferDirection.RECEIVE ) {
+ if ( applyBackPressure ) {
+ // Confirm that we received the data and the peer can now discard it but that the peer should not
+ // send any more data for a bit
+ logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
+ ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
+ } else {
+ // Confirm that we received the data and the peer can now discard it
+ logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
+ ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
+ }
+ } else {
+ final Response transactionResponse;
+ try {
+ transactionResponse = Response.read(dis);
+ } catch (final IOException e) {
+ throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " +
+ "It is unknown whether or not the peer successfully received/processed the data.", e);
+ }
+
+ logger.debug("{} Received {} from {}", this, transactionResponse, peer);
+ if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
+ peer.penalize(penaltyMillis);
+ } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
+ throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
+ }
+ }
}
- Peer getPeer() {
- return peer;
+
+ // TODO: UPDATE STATE
+ @Override
+ public void confirm() throws IOException {
+ if ( state != TransactionState.DATA_EXCHANGED ) {
+ throw new IllegalStateException("Cannot confirm Transaction because state is " + state +
+ "; Transaction can only be confirmed when state is " + TransactionState.DATA_EXCHANGED );
+ }
+
+ if ( direction == TransferDirection.RECEIVE ) {
+ if ( dataAvailable ) {
+ throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed.");
+ }
+
+ // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
+ // to peer so that we can verify that the connection is still open. This is a two-phase commit,
+ // which helps to prevent the chances of data duplication. Without doing this, we may commit the
+ // session and then when we send the response back to the peer, the peer may have timed out and may not
+ // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
+ // Critical Section involved in this transaction so that rather than the Critical Section being the
+ // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
+ logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
+ final String calculatedCRC = String.valueOf(crc.getValue());
+ ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
+
+ final Response confirmTransactionResponse = Response.read(dis);
+ logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer);
+
+ switch (confirmTransactionResponse.getCode()) {
+ case CONFIRM_TRANSACTION:
+ break;
+ case BAD_CHECKSUM:
+ throw new IOException(this + " Received a BadChecksum response from peer " + peer);
+ default:
+ throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
+ }
+ } else {
+ logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer);
+ ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
+
+ final String calculatedCRC = String.valueOf(crc.getValue());
+
+ // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
+ final Response transactionConfirmationResponse = Response.read(dis);
+ if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
+ // Confirm checksum and echo back the confirmation.
+ logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer);
+ final String receivedCRC = transactionConfirmationResponse.getMessage();
+
+ // CRC was not used before version 4
+ if ( protocolVersion > 3 ) {
+ if ( !receivedCRC.equals(calculatedCRC) ) {
+ ResponseCode.BAD_CHECKSUM.writeResponse(dos);
+ throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
+ }
+ }
+
+ ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
+ } else {
+ throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
+ }
+ }
}
+
- String calculateCRC() {
- return String.valueOf(crc.getValue());
+ // TODO: UPDATE STATE
+ @Override
+ public TransactionState getState() {
+ return state;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
new file mode 100644
index 0000000..bd1b50c
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.util;
+
+import java.io.InputStream;
+import java.util.Map;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.apache.nifi.stream.io.MinimumLengthInputStream;
+
+public class StandardDataPacket implements DataPacket {
+
+ private final Map<String, String> attributes;
+ private final InputStream stream;
+ private final long size;
+
+ public StandardDataPacket(final Map<String, String> attributes, final InputStream stream, final long size) {
+ this.attributes = attributes;
+ this.stream = new MinimumLengthInputStream(new LimitingInputStream(stream, size), size);
+ this.size = size;
+ }
+
+ public Map<String, String> getAttributes() {
+ return attributes;
+ }
+
+ public InputStream getData() {
+ return stream;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 82d8206..a51cdba 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -157,7 +157,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
logger.error(message);
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
return;
- } catch (final ProtocolException | HandshakeException | IOException e) {
+ } catch (final HandshakeException | IOException e) {
final String message = String.format("%s failed to communicate with %s due to %s", this, url, e.toString());
logger.error(message);
if ( logger.isDebugEnabled() ) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/77fd8e5e/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index 887429c..d4b9c2f 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -40,6 +40,7 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PortAuthorizationResult;
import org.apache.nifi.remote.RemoteResourceFactory;
@@ -53,8 +54,10 @@ import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.io.CompressionInputStream;
import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.protocol.ServerProtocol;
+import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StopWatch;
@@ -304,7 +307,16 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
final CheckedOutputStream checkedOutputStream = new CheckedOutputStream(flowFileOutputStream, crc);
final StopWatch transferWatch = new StopWatch(true);
- flowFile = codec.encode(flowFile, session, checkedOutputStream);
+
+ final FlowFile toSend = flowFile;
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ final DataPacket dataPacket = new StandardDataPacket(toSend.getAttributes(), in, toSend.getSize());
+ codec.encode(dataPacket, checkedOutputStream);
+ }
+ });
+
final long transmissionMillis = transferWatch.getElapsed(TimeUnit.MILLISECONDS);
// need to close the CompressionOutputStream in order to force it write out any remaining bytes.
@@ -427,7 +439,11 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
final InputStream flowFileInputStream = useGzip ? new CompressionInputStream(dis) : dis;
final CheckedInputStream checkedInputStream = new CheckedInputStream(flowFileInputStream, crc);
- FlowFile flowFile = codec.decode(checkedInputStream, session);
+ final DataPacket dataPacket = codec.decode(checkedInputStream);
+ FlowFile flowFile = session.create();
+ flowFile = session.importFrom(dataPacket.getData(), flowFile);
+ flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
+
final long transferNanos = System.nanoTime() - startNanos;
final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
final String sourceSystemFlowFileUuid = flowFile.getAttribute(CoreAttributes.UUID.key());