You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/25 14:24:24 UTC

[2/3] incubator-nifi git commit: NIFI-282: moved artifacts from site-to-site to core-api, as they were moved erroneously from site-to-site-client to site-to-site but should have gone from site-to-site-client to core-api; updated documentation and bug fix

NIFI-282: moved artifacts from site-to-site to core-api, as they were moved erroneously from site-to-site-client to site-to-site but should have gone from site-to-site-client to core-api; updated documentation and bug fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b824c5a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b824c5a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b824c5a7

Branch: refs/heads/site-to-site-client
Commit: b824c5a7fb3c607180d3f04f62cb47bbf878ea16
Parents: 9c22530
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 23 09:03:33 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 23 09:03:33 2015 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/nifi/remote/Peer.java  |  10 +-
 .../org/apache/nifi/remote/Transaction.java     | 140 ++++++++++++++++++-
 .../socket/EndpointConnectionStatePool.java     |   4 +
 .../nifi/remote/client/socket/SocketClient.java |   8 +-
 .../remote/protocol/socket/ResponseCode.java    |   1 +
 .../protocol/socket/SocketClientProtocol.java   |  31 ----
 .../socket/SocketClientTransaction.java         |  15 +-
 .../nifi/remote/PortAuthorizationResult.java    |  25 ++++
 .../nifi/remote/cluster/NodeInformant.java      |  22 +++
 .../remote/exception/BadRequestException.java   |  30 ++++
 .../exception/NotAuthorizedException.java       |  26 ++++
 .../exception/RequestExpiredException.java      |  26 ++++
 .../nifi/remote/PortAuthorizationResult.java    |  25 ----
 .../nifi/remote/cluster/NodeInformant.java      |  22 ---
 .../remote/exception/BadRequestException.java   |  30 ----
 .../exception/NotAuthorizedException.java       |  26 ----
 .../exception/RequestExpiredException.java      |  26 ----
 .../socket/SocketFlowFileServerProtocol.java    |   4 +
 18 files changed, 296 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
index e811c68..29af777 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
@@ -20,6 +20,8 @@ import java.io.IOException;
 import java.net.URI;
 
 import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
 
 public class Peer {
 
@@ -57,8 +59,12 @@ public class Peer {
     public void close() throws IOException {
         this.closed = true;
 
-        // TODO: Consume the InputStream so that it doesn't linger on the Peer's outgoing socket buffer
-        commsSession.close();
+        // Consume the InputStream so that it doesn't linger on the Peer's outgoing socket buffer
+        try {
+            StreamUtils.copy(commsSession.getInput().getInputStream(), new NullOutputStream());
+        } finally {
+            commsSession.close();
+        }
     }
 
     public void penalize(final long millis) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
index 852eea1..149d7d0 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
@@ -20,28 +20,160 @@ import java.io.IOException;
 
 import org.apache.nifi.remote.protocol.DataPacket;
 
+
+/**
+ * <p>
+ * Provides a transaction for performing site-to-site data transfers.
+ * </p>
+ * 
+ * <p>
+ * A Transaction is created by calling the 
+ * {@link org.apache.nifi.remote.client.SiteToSiteClient#createTransaction(TransferDirection) createTransaction(TransferDirection)} 
+ * method of a {@link org.apache.nifi.remote.client.SiteToSiteClient SiteToSiteClient}. The resulting Transaction
+ * can be used to either send or receive data but not both. A new Transaction must be created in order perform the
+ * other operation.
+ * </p>
+ * 
+ * <p>
+ * The general flow of execute of a Transaction is as follows:
+ * <ol>
+ *      <li>Create the transaction as described above.</li>
+ *      <li>Send data via the {@link #send(DataPacket)} method or receive data via the {@link #receive()} method.</li>
+ *      <li>Confirm the transaction via the {@link #confirm()} method.</li>
+ *      <li>Either complete the transaction via the {@link #complete(boolean)} method or cancel the transaction
+ *          via the {@link #cancel()} method.</li>
+ * </ol>
+ * </p>
+ * 
+ * <p>
+ * It is important that the Transaction be terminated in order to free the resources held
+ * by the Transaction. If a Transaction is not terminated, its resources will not be freed and
+ * if the Transaction holds connections from a connection pool, the connections in that pool
+ * will eventually become exhausted. A Transaction is terminated by calling one of the following
+ * methods:
+ *  <ul>
+ *      <li>{@link #complete(boolean)}</li>
+ *      <li>{@link #cancel()}</li>
+ *      <li>{@link #error()}</li>
+ *  </ul>
+ * </p>
+ * 
+ * <p>
+ * If at any point an IOException is thrown from one of the methods of the Transaction, that Transaction
+ * is automatically closed via a call to {@link #error()}.
+ * </p>
+ */
 public interface Transaction {
 
+    /**
+     * Sends information to the remote NiFi instance.
+     * 
+     * @param dataPacket the data packet to send
+     * @throws IOException
+     */
+    void send(DataPacket dataPacket) throws IOException;
+    
+    /**
+     * Receives information from the remote NiFi instance.
+     * 
+     * @return the DataPacket received, or {@code null} if there is no more data to receive. 
+     * @throws IOException
+     */
+    DataPacket receive() throws IOException;
+
+    /**
+     * <p>
+     * Confirms the data that was sent or received by comparing CRC32's of the data sent and the data received.
+     * </p>
+     * 
+     * <p>
+     * Even if the protocol being used to send the data is reliable and guarantees ordering of packets (such as TCP),
+     * it is still required that we confirm the transaction before completing the transaction. This is done as
+     * "safety net" or a defensive programming technique. Mistakes happen, and this mechanism helps to ensure that if
+     * a bug exists somewhere along the line that we do not end up sending or receiving corrupt data. If the
+     * CRC32 of the sender and the CRC32 of the receiver do not match, an IOException will be thrown and both the
+     * sender and receiver will cancel the transaction automatically.
+     * </p>
+     * 
+     * @throws IOException
+     */
 	void confirm() throws IOException;
 	
+	/**
+	 * <p>
+	 * Completes the transaction and indicates to both the sender and receiver that the data transfer was
+	 * successful. If receiving data, this method can also optionally request that the sender back off sending
+	 * data for a short period of time. This is used, for instance, to apply backpressure or to notify the sender
+	 * that the receiver is not ready to receive data and made not service another request in the short term.
+	 * </p>
+	 * 
+	 * @param requestBackoff if <code>true</code> and the TransferDirection is RECEIVE, indicates to sender that it
+	 * should back off sending data for a short period of time. If <code>false</code> or if the TransferDirection of
+	 * this Transaction is SEND, then this argument is ignored.
+	 * 
+	 * @throws IOException
+	 */
 	void complete(boolean requestBackoff) throws IOException;
 	
-	void cancel() throws IOException;
+	/**
+	 * <p>
+	 * Cancels this transaction, indicating to the sender that the data has not been successfully received so that
+	 * the sender can retry or handle however is appropriate.
+	 * </p>
+	 * 
+	 * @param explanation an explanation to tell the other party why the transaction was canceled.
+	 * @throws IOException
+	 */
+	void cancel(final String explanation) throws IOException;
 	
-	void send(DataPacket dataPacket) throws IOException;
 	
-	DataPacket receive() throws IOException;
+	/**
+	 * <p>
+	 * Sets the TransactionState of the Transaction to {@link TransactionState#ERROR}, and closes
+	 * the Transaction. The underlying connection should not be returned to a connection pool in this case.
+	 * </p>
+	 */
+	void error();
+	
 	
+	/**
+	 * Returns the current state of the Transaction.
+	 * @return
+	 * @throws IOException
+	 */
 	TransactionState getState() throws IOException;
 	
-	void error();
 	
 	public enum TransactionState {
+	    /**
+	     * Transaction has been started but no data has been sent or received.
+	     */
 		TRANSACTION_STARTED,
+		
+		/**
+		 * Transaction has been started and data has been sent or received.
+		 */
 		DATA_EXCHANGED,
+		
+		/**
+		 * Data that has been transferred has been confirmed via its CRC. Transaction is
+		 * ready to be completed.
+		 */
 		TRANSACTION_CONFIRMED,
+		
+		/**
+		 * Transaction has been successfully completed.
+		 */
 		TRANSACTION_COMPLETED,
+		
+		/**
+		 * The Transaction has been canceled.
+		 */
 		TRANSACTION_CANCELED,
+		
+		/**
+		 * The Transaction ended in an error.
+		 */
 		ERROR;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
index e21e2e8..e9edf43 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
@@ -665,6 +665,10 @@ public class EndpointConnectionStatePool {
         }
     }
     
+    public void terminate(final EndpointConnectionState state) {
+        cleanup(state.getSocketClientProtocol(), state.getPeer());
+    }
+    
     private void refreshPeers() {
         final PeerStatusCache existingCache = peerStatusCache;
         if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis())) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index a616da0..7cb1696 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -124,13 +124,13 @@ public class SocketClient implements SiteToSiteClient {
 			}
 
 			@Override
-			public void cancel() throws IOException {
+			public void cancel(final String explanation) throws IOException {
 				try {
-					transaction.cancel();
+					transaction.cancel(explanation);
 				} finally {
                     final EndpointConnectionState state = connectionStateRef.get();
                     if ( state != null ) {
-                        pool.offer(connectionState);
+                        pool.terminate(connectionState);
                         connectionStateRef.set(null);
                     }
 				}
@@ -143,7 +143,7 @@ public class SocketClient implements SiteToSiteClient {
 			    } finally {
                     final EndpointConnectionState state = connectionStateRef.get();
                     if ( state != null ) {
-                        pool.offer(connectionState);
+                        pool.terminate(connectionState);
                         connectionStateRef.set(null);
                     }
 			    }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
index 0e588cd..8860e73 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
@@ -40,6 +40,7 @@ public enum ResponseCode {
     CONFIRM_TRANSACTION(12, "Confirm Transaction", true),   // "Explanation" of this code is the checksum
     TRANSACTION_FINISHED(13, "Transaction Finished", false),
     TRANSACTION_FINISHED_BUT_DESTINATION_FULL(14, "Transaction Finished But Destination is Full", false),
+    CANCEL_TRANSACTION(15, "Cancel Transaction", true),
     BAD_CHECKSUM(19, "Bad Checksum", false),
 
     // data availability indicators

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index b4d1e5d..2e3386b 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -71,8 +71,6 @@ public class SocketClientProtocol implements ClientProtocol {
     private String transitUriPrefix = null;
     private int timeoutMillis = 30000;
 
-    private SocketClientTransaction transaction;
-    
     private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
     
     public SocketClientProtocol() {
@@ -248,35 +246,6 @@ public class SocketClientProtocol implements ClientProtocol {
     }
 
 
-    
-    // TODO: Transaction should be pulled out into its own class.
-    //			Flow of execution:
-    //			- start transaction
-    //			- send/receive data
-    //			- confirm contents
-    // 			- complete / rollback
-    //
-    //			- this class should validate transaction state before each step.
-    // We need to confirm transaction to ensure that data is correct. Yes, it is sent via TCP, which should ensure that the
-    // data is correct, but things happen. Humans make mistakes. There could easily be a bug on our end, for example. And this
-    // will ensure that we guard against that. It's a good defensive programming strategy.
-    public void confirmTransaction() throws IOException {
-        
-    }
-    
-    
-    public void cancelTransaction() {
-        final SocketClientTransaction transaction = this.transaction;
-        this.transaction = null;
-        
-        if ( transaction == null ) {
-            throw new IllegalStateException("Cannot rollback transaction because no transaction has been started");
-        }
-        
-        // TODO: IMPLEMENT
-    }
-    
-    
     @Override
     public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
     	final String userDn = peer.getCommunicationsSession().getUserDn();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index 33af053..8d747a9 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -183,14 +183,19 @@ public class SocketClientTransaction implements Transaction {
 	}
 	
 	
-	// TODO: UPDATE STATE
 	@Override
-	public void cancel() {
-		if ( state == TransactionState.TRANSACTION_CANCELED || state == TransactionState.TRANSACTION_COMPLETED ) {
+	public void cancel(final String explanation) throws IOException {
+		if ( state == TransactionState.TRANSACTION_CANCELED || state == TransactionState.TRANSACTION_COMPLETED || state == TransactionState.ERROR ) {
 			throw new IllegalStateException("Cannot cancel transaction because state is already " + state);
 		}
-		
-		// TODO: implement
+
+		try {
+		    ResponseCode.CANCEL_TRANSACTION.writeResponse(dos, explanation == null ? "<No explanation given>" : explanation);
+		    state = TransactionState.TRANSACTION_CANCELED;
+		} catch (final IOException ioe) {
+		    error();
+		    throw ioe;
+		}
 	}
 	
 	

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
new file mode 100644
index 0000000..8f2603a
--- /dev/null
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public interface PortAuthorizationResult {
+
+    boolean isAuthorized();
+
+    String getExplanation();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java
new file mode 100644
index 0000000..e46ff5c
--- /dev/null
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.cluster;
+
+public interface NodeInformant {
+
+    ClusterNodeInformation getNodeInformation();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
new file mode 100644
index 0000000..f6c2f4f
--- /dev/null
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class BadRequestException extends Exception {
+
+    private static final long serialVersionUID = -8034602852256106560L;
+
+    public BadRequestException(final String message) {
+        super(message);
+    }
+
+    public BadRequestException(final Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
new file mode 100644
index 0000000..24ff3a5
--- /dev/null
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class NotAuthorizedException extends Exception {
+
+    private static final long serialVersionUID = 2952623568114035498L;
+
+    public NotAuthorizedException(final String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
new file mode 100644
index 0000000..dd675b3
--- /dev/null
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+/**
+ * Used to indicate that by the time the request was serviced, it had already
+ * expired
+ */
+public class RequestExpiredException extends Exception {
+
+    private static final long serialVersionUID = -7037025330562827852L;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
deleted file mode 100644
index 8f2603a..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-public interface PortAuthorizationResult {
-
-    boolean isAuthorized();
-
-    String getExplanation();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java
deleted file mode 100644
index e46ff5c..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.cluster;
-
-public interface NodeInformant {
-
-    ClusterNodeInformation getNodeInformation();
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
deleted file mode 100644
index f6c2f4f..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class BadRequestException extends Exception {
-
-    private static final long serialVersionUID = -8034602852256106560L;
-
-    public BadRequestException(final String message) {
-        super(message);
-    }
-
-    public BadRequestException(final Throwable cause) {
-        super(cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
deleted file mode 100644
index 24ff3a5..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class NotAuthorizedException extends Exception {
-
-    private static final long serialVersionUID = 2952623568114035498L;
-
-    public NotAuthorizedException(final String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
deleted file mode 100644
index dd675b3..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-/**
- * Used to indicate that by the time the request was serviced, it had already
- * expired
- */
-public class RequestExpiredException extends Exception {
-
-    private static final long serialVersionUID = -7037025330562827852L;
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index d4b9c2f..12c234e 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -465,6 +465,10 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
                     continueTransaction = false;
                     calculatedCRC = String.valueOf(checkedInputStream.getChecksum().getValue());
                     break;
+                case CANCEL_TRANSACTION:
+                    logger.info("{} Received CancelTransaction indicator from {} with explanation {}", this, peer, transactionResponse.getMessage());
+                    session.rollback();
+                    return 0;
                 default:
                     throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionResponse);
             }