You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/25 14:24:24 UTC
[2/3] incubator-nifi git commit: NIFI-282: moved artifacts from
site-to-site to core-api, as they were moved erroneously from
site-to-site-client to site-to-site but should have gone from
site-to-site-client to core-api; updated documentation and bug fix
NIFI-282: moved artifacts from site-to-site to core-api, as they were moved erroneously from site-to-site-client to site-to-site but should have gone from site-to-site-client to core-api; updated documentation and bug fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b824c5a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b824c5a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b824c5a7
Branch: refs/heads/site-to-site-client
Commit: b824c5a7fb3c607180d3f04f62cb47bbf878ea16
Parents: 9c22530
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 23 09:03:33 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 23 09:03:33 2015 -0500
----------------------------------------------------------------------
.../main/java/org/apache/nifi/remote/Peer.java | 10 +-
.../org/apache/nifi/remote/Transaction.java | 140 ++++++++++++++++++-
.../socket/EndpointConnectionStatePool.java | 4 +
.../nifi/remote/client/socket/SocketClient.java | 8 +-
.../remote/protocol/socket/ResponseCode.java | 1 +
.../protocol/socket/SocketClientProtocol.java | 31 ----
.../socket/SocketClientTransaction.java | 15 +-
.../nifi/remote/PortAuthorizationResult.java | 25 ++++
.../nifi/remote/cluster/NodeInformant.java | 22 +++
.../remote/exception/BadRequestException.java | 30 ++++
.../exception/NotAuthorizedException.java | 26 ++++
.../exception/RequestExpiredException.java | 26 ++++
.../nifi/remote/PortAuthorizationResult.java | 25 ----
.../nifi/remote/cluster/NodeInformant.java | 22 ---
.../remote/exception/BadRequestException.java | 30 ----
.../exception/NotAuthorizedException.java | 26 ----
.../exception/RequestExpiredException.java | 26 ----
.../socket/SocketFlowFileServerProtocol.java | 4 +
18 files changed, 296 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
index e811c68..29af777 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
@@ -20,6 +20,8 @@ import java.io.IOException;
import java.net.URI;
import org.apache.nifi.remote.protocol.CommunicationsSession;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
public class Peer {
@@ -57,8 +59,12 @@ public class Peer {
public void close() throws IOException {
this.closed = true;
- // TODO: Consume the InputStream so that it doesn't linger on the Peer's outgoing socket buffer
- commsSession.close();
+ // Consume the InputStream so that it doesn't linger on the Peer's outgoing socket buffer
+ try {
+ StreamUtils.copy(commsSession.getInput().getInputStream(), new NullOutputStream());
+ } finally {
+ commsSession.close();
+ }
}
public void penalize(final long millis) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
index 852eea1..149d7d0 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
@@ -20,28 +20,160 @@ import java.io.IOException;
import org.apache.nifi.remote.protocol.DataPacket;
+
+/**
+ * <p>
+ * Provides a transaction for performing site-to-site data transfers.
+ * </p>
+ *
+ * <p>
+ * A Transaction is created by calling the
+ * {@link org.apache.nifi.remote.client.SiteToSiteClient#createTransaction(TransferDirection) createTransaction(TransferDirection)}
+ * method of a {@link org.apache.nifi.remote.client.SiteToSiteClient SiteToSiteClient}. The resulting Transaction
+ * can be used to either send or receive data but not both. A new Transaction must be created in order perform the
+ * other operation.
+ * </p>
+ *
+ * <p>
+ * The general flow of execute of a Transaction is as follows:
+ * <ol>
+ * <li>Create the transaction as described above.</li>
+ * <li>Send data via the {@link #send(DataPacket)} method or receive data via the {@link #receive()} method.</li>
+ * <li>Confirm the transaction via the {@link #confirm()} method.</li>
+ * <li>Either complete the transaction via the {@link #complete(boolean)} method or cancel the transaction
+ * via the {@link #cancel()} method.</li>
+ * </ol>
+ * </p>
+ *
+ * <p>
+ * It is important that the Transaction be terminated in order to free the resources held
+ * by the Transaction. If a Transaction is not terminated, its resources will not be freed and
+ * if the Transaction holds connections from a connection pool, the connections in that pool
+ * will eventually become exhausted. A Transaction is terminated by calling one of the following
+ * methods:
+ * <ul>
+ * <li>{@link #complete(boolean)}</li>
+ * <li>{@link #cancel()}</li>
+ * <li>{@link #error()}</li>
+ * </ul>
+ * </p>
+ *
+ * <p>
+ * If at any point an IOException is thrown from one of the methods of the Transaction, that Transaction
+ * is automatically closed via a call to {@link #error()}.
+ * </p>
+ */
public interface Transaction {
+ /**
+ * Sends information to the remote NiFi instance.
+ *
+ * @param dataPacket the data packet to send
+ * @throws IOException
+ */
+ void send(DataPacket dataPacket) throws IOException;
+
+ /**
+ * Receives information from the remote NiFi instance.
+ *
+ * @return the DataPacket received, or {@code null} if there is no more data to receive.
+ * @throws IOException
+ */
+ DataPacket receive() throws IOException;
+
+ /**
+ * <p>
+ * Confirms the data that was sent or received by comparing CRC32's of the data sent and the data received.
+ * </p>
+ *
+ * <p>
+ * Even if the protocol being used to send the data is reliable and guarantees ordering of packets (such as TCP),
+ * it is still required that we confirm the transaction before completing the transaction. This is done as
+ * "safety net" or a defensive programming technique. Mistakes happen, and this mechanism helps to ensure that if
+ * a bug exists somewhere along the line that we do not end up sending or receiving corrupt data. If the
+ * CRC32 of the sender and the CRC32 of the receiver do not match, an IOException will be thrown and both the
+ * sender and receiver will cancel the transaction automatically.
+ * </p>
+ *
+ * @throws IOException
+ */
void confirm() throws IOException;
+ /**
+ * <p>
+ * Completes the transaction and indicates to both the sender and receiver that the data transfer was
+ * successful. If receiving data, this method can also optionally request that the sender back off sending
+ * data for a short period of time. This is used, for instance, to apply backpressure or to notify the sender
+ * that the receiver is not ready to receive data and made not service another request in the short term.
+ * </p>
+ *
+ * @param requestBackoff if <code>true</code> and the TransferDirection is RECEIVE, indicates to sender that it
+ * should back off sending data for a short period of time. If <code>false</code> or if the TransferDirection of
+ * this Transaction is SEND, then this argument is ignored.
+ *
+ * @throws IOException
+ */
void complete(boolean requestBackoff) throws IOException;
- void cancel() throws IOException;
+ /**
+ * <p>
+ * Cancels this transaction, indicating to the sender that the data has not been successfully received so that
+ * the sender can retry or handle however is appropriate.
+ * </p>
+ *
+ * @param explanation an explanation to tell the other party why the transaction was canceled.
+ * @throws IOException
+ */
+ void cancel(final String explanation) throws IOException;
- void send(DataPacket dataPacket) throws IOException;
- DataPacket receive() throws IOException;
+ /**
+ * <p>
+ * Sets the TransactionState of the Transaction to {@link TransactionState#ERROR}, and closes
+ * the Transaction. The underlying connection should not be returned to a connection pool in this case.
+ * </p>
+ */
+ void error();
+
+ /**
+ * Returns the current state of the Transaction.
+ * @return
+ * @throws IOException
+ */
TransactionState getState() throws IOException;
- void error();
public enum TransactionState {
+ /**
+ * Transaction has been started but no data has been sent or received.
+ */
TRANSACTION_STARTED,
+
+ /**
+ * Transaction has been started and data has been sent or received.
+ */
DATA_EXCHANGED,
+
+ /**
+ * Data that has been transferred has been confirmed via its CRC. Transaction is
+ * ready to be completed.
+ */
TRANSACTION_CONFIRMED,
+
+ /**
+ * Transaction has been successfully completed.
+ */
TRANSACTION_COMPLETED,
+
+ /**
+ * The Transaction has been canceled.
+ */
TRANSACTION_CANCELED,
+
+ /**
+ * The Transaction ended in an error.
+ */
ERROR;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
index e21e2e8..e9edf43 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
@@ -665,6 +665,10 @@ public class EndpointConnectionStatePool {
}
}
+ public void terminate(final EndpointConnectionState state) {
+ cleanup(state.getSocketClientProtocol(), state.getPeer());
+ }
+
private void refreshPeers() {
final PeerStatusCache existingCache = peerStatusCache;
if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis())) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index a616da0..7cb1696 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -124,13 +124,13 @@ public class SocketClient implements SiteToSiteClient {
}
@Override
- public void cancel() throws IOException {
+ public void cancel(final String explanation) throws IOException {
try {
- transaction.cancel();
+ transaction.cancel(explanation);
} finally {
final EndpointConnectionState state = connectionStateRef.get();
if ( state != null ) {
- pool.offer(connectionState);
+ pool.terminate(connectionState);
connectionStateRef.set(null);
}
}
@@ -143,7 +143,7 @@ public class SocketClient implements SiteToSiteClient {
} finally {
final EndpointConnectionState state = connectionStateRef.get();
if ( state != null ) {
- pool.offer(connectionState);
+ pool.terminate(connectionState);
connectionStateRef.set(null);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
index 0e588cd..8860e73 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java
@@ -40,6 +40,7 @@ public enum ResponseCode {
CONFIRM_TRANSACTION(12, "Confirm Transaction", true), // "Explanation" of this code is the checksum
TRANSACTION_FINISHED(13, "Transaction Finished", false),
TRANSACTION_FINISHED_BUT_DESTINATION_FULL(14, "Transaction Finished But Destination is Full", false),
+ CANCEL_TRANSACTION(15, "Cancel Transaction", true),
BAD_CHECKSUM(19, "Bad Checksum", false),
// data availability indicators
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index b4d1e5d..2e3386b 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -71,8 +71,6 @@ public class SocketClientProtocol implements ClientProtocol {
private String transitUriPrefix = null;
private int timeoutMillis = 30000;
- private SocketClientTransaction transaction;
-
private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
public SocketClientProtocol() {
@@ -248,35 +246,6 @@ public class SocketClientProtocol implements ClientProtocol {
}
-
- // TODO: Transaction should be pulled out into its own class.
- // Flow of execution:
- // - start transaction
- // - send/receive data
- // - confirm contents
- // - complete / rollback
- //
- // - this class should validate transaction state before each step.
- // We need to confirm transaction to ensure that data is correct. Yes, it is sent via TCP, which should ensure that the
- // data is correct, but things happen. Humans make mistakes. There could easily be a bug on our end, for example. And this
- // will ensure that we guard against that. It's a good defensive programming strategy.
- public void confirmTransaction() throws IOException {
-
- }
-
-
- public void cancelTransaction() {
- final SocketClientTransaction transaction = this.transaction;
- this.transaction = null;
-
- if ( transaction == null ) {
- throw new IllegalStateException("Cannot rollback transaction because no transaction has been started");
- }
-
- // TODO: IMPLEMENT
- }
-
-
@Override
public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
final String userDn = peer.getCommunicationsSession().getUserDn();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index 33af053..8d747a9 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -183,14 +183,19 @@ public class SocketClientTransaction implements Transaction {
}
- // TODO: UPDATE STATE
@Override
- public void cancel() {
- if ( state == TransactionState.TRANSACTION_CANCELED || state == TransactionState.TRANSACTION_COMPLETED ) {
+ public void cancel(final String explanation) throws IOException {
+ if ( state == TransactionState.TRANSACTION_CANCELED || state == TransactionState.TRANSACTION_COMPLETED || state == TransactionState.ERROR ) {
throw new IllegalStateException("Cannot cancel transaction because state is already " + state);
}
-
- // TODO: implement
+
+ try {
+ ResponseCode.CANCEL_TRANSACTION.writeResponse(dos, explanation == null ? "<No explanation given>" : explanation);
+ state = TransactionState.TRANSACTION_CANCELED;
+ } catch (final IOException ioe) {
+ error();
+ throw ioe;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
new file mode 100644
index 0000000..8f2603a
--- /dev/null
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+public interface PortAuthorizationResult {
+
+ boolean isAuthorized();
+
+ String getExplanation();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java
new file mode 100644
index 0000000..e46ff5c
--- /dev/null
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.cluster;
+
+public interface NodeInformant {
+
+ ClusterNodeInformation getNodeInformation();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
new file mode 100644
index 0000000..f6c2f4f
--- /dev/null
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class BadRequestException extends Exception {
+
+ private static final long serialVersionUID = -8034602852256106560L;
+
+ public BadRequestException(final String message) {
+ super(message);
+ }
+
+ public BadRequestException(final Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
new file mode 100644
index 0000000..24ff3a5
--- /dev/null
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+public class NotAuthorizedException extends Exception {
+
+ private static final long serialVersionUID = 2952623568114035498L;
+
+ public NotAuthorizedException(final String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
new file mode 100644
index 0000000..dd675b3
--- /dev/null
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.exception;
+
+/**
+ * Used to indicate that by the time the request was serviced, it had already
+ * expired
+ */
+public class RequestExpiredException extends Exception {
+
+ private static final long serialVersionUID = -7037025330562827852L;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
deleted file mode 100644
index 8f2603a..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote;
-
-public interface PortAuthorizationResult {
-
- boolean isAuthorized();
-
- String getExplanation();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java
deleted file mode 100644
index e46ff5c..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.cluster;
-
-public interface NodeInformant {
-
- ClusterNodeInformation getNodeInformation();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
deleted file mode 100644
index f6c2f4f..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class BadRequestException extends Exception {
-
- private static final long serialVersionUID = -8034602852256106560L;
-
- public BadRequestException(final String message) {
- super(message);
- }
-
- public BadRequestException(final Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
deleted file mode 100644
index 24ff3a5..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-public class NotAuthorizedException extends Exception {
-
- private static final long serialVersionUID = 2952623568114035498L;
-
- public NotAuthorizedException(final String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
deleted file mode 100644
index dd675b3..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.remote.exception;
-
-/**
- * Used to indicate that by the time the request was serviced, it had already
- * expired
- */
-public class RequestExpiredException extends Exception {
-
- private static final long serialVersionUID = -7037025330562827852L;
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b824c5a7/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index d4b9c2f..12c234e 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -465,6 +465,10 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
continueTransaction = false;
calculatedCRC = String.valueOf(checkedInputStream.getChecksum().getValue());
break;
+ case CANCEL_TRANSACTION:
+ logger.info("{} Received CancelTransaction indicator from {} with explanation {}", this, peer, transactionResponse.getMessage());
+ session.rollback();
+ return 0;
default:
throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionResponse);
}