You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/12 14:17:02 UTC
[1/2] incubator-nifi git commit: Refactored client and add javadocs
Repository: incubator-nifi
Updated Branches:
refs/heads/nifi-site-to-site-client 4ab5c308f -> d1e058cde
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 1e33e1f..da9d027 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -18,6 +18,7 @@ package org.apache.nifi.remote;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -25,8 +26,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLContext;
@@ -34,28 +33,30 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.remote.client.socket.EndpointConnection;
-import org.apache.nifi.remote.client.socket.EndpointConnectionPool;
-import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException;
-import org.apache.nifi.remote.protocol.ClientProtocol;
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StandardRemoteGroupPort extends RemoteGroupPort {
+ private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
public static final String USER_AGENT = "NiFi-Site-to-Site";
public static final String CONTENT_TYPE = "application/octet-stream";
@@ -71,11 +72,8 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
private final SSLContext sslContext;
private final TransferDirection transferDirection;
- private final AtomicReference<EndpointConnectionPool> connectionPoolRef = new AtomicReference<>();
+ private final AtomicReference<SiteToSiteClient> clientRef = new AtomicReference<>();
- private final Set<CommunicationsSession> activeCommsChannels = new HashSet<>();
- private final Lock interruptLock = new ReentrantLock();
- private boolean shutdown = false; // guarded by codecLock
public StandardRemoteGroupPort(final String id, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup,
final TransferDirection direction, final ConnectableType type, final SSLContext sslContext, final ProcessScheduler scheduler) {
@@ -112,16 +110,14 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
@Override
public void shutdown() {
super.shutdown();
- interruptLock.lock();
- try {
- this.shutdown = true;
- } finally {
- interruptLock.unlock();
- }
- final EndpointConnectionPool pool = connectionPoolRef.get();
- if ( pool != null ) {
- pool.shutdown();
+ final SiteToSiteClient client = clientRef.get();
+ if ( client != null ) {
+ try {
+ client.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to properly shutdown Site-to-Site Client due to {}", ioe);
+ }
}
}
@@ -129,17 +125,14 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
public void onSchedulingStart() {
super.onSchedulingStart();
- interruptLock.lock();
- try {
- this.shutdown = false;
- } finally {
- interruptLock.unlock();
- }
-
- final EndpointConnectionPool connectionPool = new EndpointConnectionPool(remoteGroup.getTargetUri().toString(),
- remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS),
- sslContext, remoteGroup.getEventReporter(), getPeerPersistenceFile(getIdentifier()));
- connectionPoolRef.set(connectionPool);
+ final SiteToSiteClient client = new SiteToSiteClient.Builder()
+ .url(remoteGroup.getTargetUri().toString())
+ .portIdentifier(getIdentifier())
+ .sslContext(sslContext)
+ .eventReporter(remoteGroup.getEventReporter())
+ .peerPersistenceFile(getPeerPersistenceFile(getIdentifier()))
+ .build();
+ clientRef.set(client);
}
@@ -157,10 +150,10 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
String url = getRemoteProcessGroup().getTargetUri().toString();
- final EndpointConnectionPool connectionPool = connectionPoolRef.get();
- final EndpointConnection connection;
+ final SiteToSiteClient client = clientRef.get();
+ final Transaction transaction;
try {
- connection = connectionPool.getEndpointConnection(this, transferDirection);
+ transaction = client.createTransaction(transferDirection);
} catch (final PortNotRunningException e) {
context.yield();
this.targetRunning.set(false);
@@ -186,95 +179,36 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
return;
}
- if ( connection == null ) {
- logger.debug("{} Unable to determine the next peer to communicate with; all peers must be penalized, so yielding context", this);
+ if ( transaction == null ) {
+ logger.debug("{} Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this);
context.yield();
return;
}
-
- FlowFileCodec codec = connection.getCodec();
- SocketClientProtocol protocol = connection.getSocketClientProtocol();
- final Peer peer = connection.getPeer();
- url = peer.getUrl();
-
+
try {
- interruptLock.lock();
- try {
- if ( shutdown ) {
- peer.getCommunicationsSession().interrupt();
- }
-
- activeCommsChannels.add(peer.getCommunicationsSession());
- } finally {
- interruptLock.unlock();
- }
-
if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
- transferFlowFiles(peer, protocol, context, session, codec);
+ transferFlowFiles(transaction, context, session);
} else {
- final int numReceived = receiveFlowFiles(peer, protocol, context, session, codec);
+ final int numReceived = receiveFlowFiles(transaction, context, session);
if ( numReceived == 0 ) {
context.yield();
}
}
- interruptLock.lock();
- try {
- if ( shutdown ) {
- peer.getCommunicationsSession().interrupt();
- }
-
- activeCommsChannels.remove(peer.getCommunicationsSession());
- } finally {
- interruptLock.unlock();
- }
-
session.commit();
-
- connection.setLastTimeUsed();
- connectionPool.offer(connection);
- } catch (final TransmissionDisabledException e) {
- cleanup(protocol, peer);
- session.rollback();
} catch (final Exception e) {
- connectionPool.penalize(peer, getYieldPeriod(TimeUnit.MILLISECONDS));
-
- final String message = String.format("%s failed to communicate with %s (%s) due to %s", this, peer == null ? url : peer, protocol, e.toString());
- logger.error(message);
+ final String message = String.format("%s failed to communicate with remote NiFi instance due to %s", this, e.toString());
+ logger.error("{} failed to communicate with remote NiFi instance due to {}", this, e.toString());
if ( logger.isDebugEnabled() ) {
logger.error("", e);
}
- cleanup(protocol, peer);
-
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
session.rollback();
}
}
- private void cleanup(final SocketClientProtocol protocol, final Peer peer) {
- if ( protocol != null && peer != null ) {
- try {
- protocol.shutdown(peer);
- } catch (final TransmissionDisabledException e) {
- // User disabled transmission.... do nothing.
- logger.debug(this + " Transmission Disabled by User");
- } catch (IOException e1) {
- }
- }
-
- if ( peer != null ) {
- try {
- peer.close();
- } catch (final TransmissionDisabledException e) {
- // User disabled transmission.... do nothing.
- logger.debug(this + " Transmission Disabled by User");
- } catch (IOException e1) {
- }
- }
- }
-
@Override
public String getYieldPeriod() {
// delegate yield duration to remote process group
@@ -282,12 +216,129 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
}
- private int transferFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
- return protocol.transferFlowFiles(peer, context, session, codec);
+ private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session) throws IOException, ProtocolException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return 0;
+ }
+
+ try {
+ final String userDn = transaction.getCommunicant().getDistinguishedName();
+ final long startSendingNanos = System.nanoTime();
+ final StopWatch stopWatch = new StopWatch(true);
+ long bytesSent = 0L;
+
+ final Set<FlowFile> flowFilesSent = new HashSet<>();
+ boolean continueTransaction = true;
+ while (continueTransaction) {
+ final long startNanos = System.nanoTime();
+ // call codec.encode within a session callback so that we have the InputStream to read the FlowFile
+ final FlowFile toWrap = flowFile;
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ final DataPacket dataPacket = new StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize());
+ transaction.send(dataPacket);
+ }
+ });
+
+ final long transferNanos = System.nanoTime() - startNanos;
+ final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+
+ flowFilesSent.add(flowFile);
+ bytesSent += flowFile.getSize();
+ logger.debug("{} Sent {} to {}", this, flowFile, transaction.getCommunicant().getUrl());
+
+ final String transitUri = transaction.getCommunicant().getUrl() + "/" + flowFile.getAttribute(CoreAttributes.UUID.key());
+ session.getProvenanceReporter().send(flowFile, transitUri, "Remote DN=" + userDn, transferMillis, false);
+ session.remove(flowFile);
+
+ final long sendingNanos = System.nanoTime() - startSendingNanos;
+ if ( sendingNanos < BATCH_SEND_NANOS ) {
+ flowFile = session.get();
+ } else {
+ flowFile = null;
+ }
+
+ continueTransaction = (flowFile != null);
+ }
+
+ transaction.confirm();
+
+ // consume input stream entirely, ignoring its contents. If we
+ // don't do this, the Connection will not be returned to the pool
+ stopWatch.stop();
+ final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
+ final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ final String dataSize = FormatUtils.formatDataSize(bytesSent);
+
+ session.commit();
+ transaction.complete();
+
+ final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+ logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
+ this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
+
+ return flowFilesSent.size();
+ } catch (final Exception e) {
+ session.rollback();
+ throw e;
+ }
+
+
}
- private int receiveFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
- return protocol.receiveFlowFiles(peer, context, session, codec);
+ private int receiveFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session) throws IOException, ProtocolException {
+ final String userDn = transaction.getCommunicant().getDistinguishedName();
+
+ final StopWatch stopWatch = new StopWatch(true);
+ final Set<FlowFile> flowFilesReceived = new HashSet<>();
+ long bytesReceived = 0L;
+
+ while (true) {
+ final long start = System.nanoTime();
+ final DataPacket dataPacket = transaction.receive();
+ if ( dataPacket == null ) {
+ break;
+ }
+
+ FlowFile flowFile = session.create();
+ flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
+ flowFile = session.importFrom(dataPacket.getData(), flowFile);
+ final long receiveNanos = System.nanoTime() - start;
+
+ String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
+ if ( sourceFlowFileIdentifier == null ) {
+ sourceFlowFileIdentifier = "<Unknown Identifier>";
+ }
+
+ final String transitUri = transaction.getCommunicant().getUrl() + sourceFlowFileIdentifier;
+ session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier,
+ "Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos));
+
+ session.transfer(flowFile, Relationship.ANONYMOUS);
+ bytesReceived += dataPacket.getSize();
+ }
+
+ // Confirm that what we received was the correct data.
+ transaction.confirm();
+
+ // Commit the session so that we have persisted the data
+ session.commit();
+
+ transaction.complete();
+
+ if ( !flowFilesReceived.isEmpty() ) {
+ stopWatch.stop();
+ final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
+ final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
+ final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ final String dataSize = FormatUtils.formatDataSize(bytesReceived);
+ logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] {
+ this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate });
+ }
+
+ return flowFilesReceived.size();
}
@Override
[2/2] incubator-nifi git commit: Refactored client and add javadocs
Posted by ma...@apache.org.
Refactored client and add javadocs
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d1e058cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d1e058cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d1e058cd
Branch: refs/heads/nifi-site-to-site-client
Commit: d1e058cde7b011a4daa0d574d392569460fc70ba
Parents: 4ab5c30
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Feb 12 08:16:55 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Feb 12 08:16:55 2015 -0500
----------------------------------------------------------------------
.../org/apache/nifi/remote/Communicant.java | 47 +++
.../main/java/org/apache/nifi/remote/Peer.java | 19 +-
.../nifi/remote/RemoteResourceInitiator.java | 9 +
.../org/apache/nifi/remote/Transaction.java | 27 +-
.../nifi/remote/TransactionCompletion.java | 63 +++
.../nifi/remote/client/SiteToSiteClient.java | 31 +-
.../remote/client/SiteToSiteClientConfig.java | 21 +-
.../client/socket/EndpointConnectionPool.java | 113 ++++--
.../nifi/remote/client/socket/SocketClient.java | 20 +-
.../protocol/socket/SocketClientProtocol.java | 7 +-
.../socket/SocketClientTransaction.java | 401 ++++++++++---------
.../SocketClientTransactionCompletion.java | 57 +++
.../client/socket/TestSiteToSiteClient.java | 16 +-
.../nifi/remote/SocketRemoteSiteListener.java | 8 +-
.../nifi/remote/StandardRemoteGroupPort.java | 263 +++++++-----
15 files changed, 720 insertions(+), 382 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java
new file mode 100644
index 0000000..ac2d498
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Communicant.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+/**
+ * Represents the remote entity that the client is communicating with
+ */
+public interface Communicant {
+ /**
+ * Returns the NiFi site-to-site URL for the remote NiFi instance
+ * @return
+ */
+ String getUrl();
+
+ /**
+ * The Host of the remote NiFi instance
+ * @return
+ */
+ String getHost();
+
+ /**
+ * The Port that the remote NiFi instance is listening on for site-to-site communications
+ * @return
+ */
+ int getPort();
+
+ /**
+ * The distinguished name that the remote NiFi instance has provided in its certificate if
+ * using secure communications, or <code>null</code> if the Distinguished Name is unknown
+ * @return
+ */
+ String getDistinguishedName();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
index dda5ae3..3534f95 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java
@@ -23,12 +23,13 @@ import java.util.Map;
import org.apache.nifi.remote.protocol.CommunicationsSession;
-public class Peer {
+public class Peer implements Communicant {
private final CommunicationsSession commsSession;
private final String url;
private final String clusterUrl;
private final String host;
+ private final int port;
private final Map<String, Long> penaltyExpirationMap = new HashMap<>();
private boolean closed = false;
@@ -39,12 +40,15 @@ public class Peer {
this.clusterUrl = clusterUrl;
try {
- this.host = new URI(peerUrl).getHost();
+ final URI uri = new URI(peerUrl);
+ this.port = uri.getPort();
+ this.host = uri.getHost();
} catch (final Exception e) {
throw new IllegalArgumentException("Invalid URL: " + peerUrl);
}
}
+ @Override
public String getUrl() {
return url;
}
@@ -92,6 +96,7 @@ public class Peer {
return closed;
}
+ @Override
public String getHost() {
return host;
}
@@ -127,4 +132,14 @@ public class Peer {
sb.append("]");
return sb.toString();
}
+
+ @Override
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public String getDistinguishedName() {
+ return commsSession.getUserDn();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
index 8eb5d8d..f469724 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java
@@ -21,26 +21,33 @@ import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.nifi.remote.exception.HandshakeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RemoteResourceInitiator {
public static final int RESOURCE_OK = 20;
public static final int DIFFERENT_RESOURCE_VERSION = 21;
public static final int ABORT = 255;
+ private static final Logger logger = LoggerFactory.getLogger(RemoteResourceInitiator.class);
public static VersionedRemoteResource initiateResourceNegotiation(final VersionedRemoteResource resource, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
// Write the classname of the RemoteStreamCodec, followed by its version
+ logger.debug("Negotiating resource; proposal is {}", resource);
dos.writeUTF(resource.getResourceName());
final VersionNegotiator negotiator = resource.getVersionNegotiator();
dos.writeInt(negotiator.getVersion());
dos.flush();
// wait for response from server.
+ logger.debug("Receiving response from remote instance");
final int statusCode = dis.read();
switch (statusCode) {
case RESOURCE_OK: // server accepted our proposal of codec name/version
+ logger.debug("Response was RESOURCE_OK");
return resource;
case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal of codec name but not the version
+ logger.debug("Response was DIFFERENT_RESOURCE_VERSION");
// Get server's preferred version
final int newVersion = dis.readInt();
@@ -56,8 +63,10 @@ public class RemoteResourceInitiator {
// Attempt negotiation of resource based on our new preferred version.
return initiateResourceNegotiation(resource, dis, dos);
case ABORT:
+ logger.debug("Response was ABORT");
throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF());
default:
+ logger.debug("Response was {}; unable to negotiate codec", statusCode);
return null; // Unable to negotiate codec
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
index 9fb6147..51bf244 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
@@ -121,30 +121,16 @@ public interface Transaction {
void confirm() throws IOException;
/**
- * <p>
- * Completes the transaction and indicates to both the sender and receiver that the data transfer was
- * successful. If receiving data, this method can also optionally request that the sender back off sending
- * data for a short period of time. This is used, for instance, to apply backpressure or to notify the sender
- * that the receiver is not ready to receive data and made not service another request in the short term.
- * </p>
- *
- * @param requestBackoff if <code>true</code> and the TransferDirection is RECEIVE, indicates to sender that it
- * should back off sending data for a short period of time. If <code>false</code> or if the TransferDirection of
- * this Transaction is SEND, then this argument is ignored.
- *
- * @throws IOException
- */
- void complete(boolean requestBackoff) throws IOException;
-
- /**
* <p>
* Completes the transaction and indicates to both the sender and receiver that the data transfer was
* successful.
* </p>
*
* @throws IOException
+ *
+ * @return a TransactionCompletion that contains details about the Transaction
*/
- void complete() throws IOException;
+ TransactionCompletion complete() throws IOException;
/**
* <p>
@@ -174,6 +160,13 @@ public interface Transaction {
*/
TransactionState getState() throws IOException;
+ /**
+ * Returns a Communicant that represents the other side of this Transaction (i.e.,
+ * the remote NiFi instance)
+ * @return
+ */
+ Communicant getCommunicant();
+
public enum TransactionState {
/**
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java
new file mode 100644
index 0000000..be5f73a
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/TransactionCompletion.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote;
+
+import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.remote.protocol.DataPacket;
+
+
+/**
+ * A TransactionCompletion provides information about a {@link Transaction} that has completed successfully.
+ */
+public interface TransactionCompletion {
+
+ /**
+ * When a sending to a NiFi instance, the server may accept the content sent to it
+ * but indicate that its queues are full and that the client should backoff sending
+ * data for a bit. This method returns <code>true</code> if the server did in fact
+ * request that, <code>false</code> otherwise.
+ * @return
+ */
+ boolean isBackoff();
+
+ /**
+ * Returns the number of Data Packets that were sent to or received from the remote
+ * NiFi instance in the Transaction
+ * @return
+ */
+ int getDataPacketsTransferred();
+
+ /**
+ * Returns the number of bytes of DataPacket content that were sent to or received from
+ * the remote NiFI instance in the Transaction. Note that this is different than the number
+ * of bytes actually transferred between the client and server, as it does not take into
+ * account the attributes or protocol-specific information that is exchanged but rather
+ * takes into account only the data in the {@link InputStream} of the {@link DataPacket}
+ * @return
+ */
+ long getBytesTransferred();
+
+ /**
+ * Returns the amount of time that the Transaction took, from the time that the Transaction
+ * was created to the time that the Transaction was completed.
+ * @param timeUnit
+ * @return
+ */
+ long getDuration(TimeUnit timeUnit);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 47568fd..0591b5a 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -126,6 +126,7 @@ public interface SiteToSiteClient extends Closeable {
private String url;
private long timeoutNanos = TimeUnit.SECONDS.toNanos(30);
private long penalizationNanos = TimeUnit.SECONDS.toNanos(3);
+ private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L);
private SSLContext sslContext;
private EventReporter eventReporter;
private File peerPersistenceFile;
@@ -163,6 +164,19 @@ public interface SiteToSiteClient extends Closeable {
}
/**
+ * Specifies the amount of time that a connection can remain idle in the connection pool before it
+ * is "expired" and shutdown. The default value is 30 seconds.
+ *
+ * @param timeout
+ * @param unit
+ * @return
+ */
+ public Builder idleExpiration(final long timeout, final TimeUnit unit) {
+ this.idleExpirationNanos = unit.toNanos(timeout);
+ return this;
+ }
+
+ /**
* If there is a problem communicating with a node (i.e., any node in the remote NiFi cluster
* or the remote instance of NiFi if it is standalone), specifies how long the client should
* wait before attempting to communicate with that node again. While a particular node is penalized,
@@ -327,6 +341,11 @@ public interface SiteToSiteClient extends Closeable {
}
@Override
+ public long getIdleConnectionExpiration(final TimeUnit timeUnit) {
+ return Builder.this.getIdleConnectionExpiration(timeUnit);
+ }
+
+ @Override
public SSLContext getSslContext() {
return Builder.this.getSslContext();
}
@@ -384,12 +403,22 @@ public interface SiteToSiteClient extends Closeable {
}
/**
- * Returns the communications timeout in nanoseconds
+ * Returns the communications timeout
* @return
*/
public long getTimeout(final TimeUnit timeUnit) {
return timeUnit.convert(timeoutNanos, TimeUnit.NANOSECONDS);
}
+
+ /**
+ * Returns the amount of of time that a connection can remain idle in the connection
+ * pool before being shutdown
+ * @param timeUnit
+ * @return
+ */
+ public long getIdleConnectionExpiration(final TimeUnit timeUnit) {
+ return timeUnit.convert(idleExpirationNanos, TimeUnit.NANOSECONDS);
+ }
/**
* Returns the amount of time that a particular node will be ignored after a
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
index 37c48f8..d03ab3c 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -37,6 +37,14 @@ public interface SiteToSiteClientConfig {
* @return
*/
long getTimeout(final TimeUnit timeUnit);
+
+ /**
+ * Returns the amount of time that a connection can remain idle before it is
+ * "expired" and shut down
+ * @param timeUnit
+ * @return
+ */
+ long getIdleConnectionExpiration(TimeUnit timeUnit);
/**
* Returns the amount of time that a particular node will be ignored after a
@@ -53,12 +61,6 @@ public interface SiteToSiteClientConfig {
SSLContext getSslContext();
/**
- * Returns the EventReporter that is to be used by clients to report events
- * @return
- */
- EventReporter getEventReporter();
-
- /**
* Returns the file that is to be used for persisting the nodes of a remote cluster, if any.
* @return
*/
@@ -111,4 +113,11 @@ public interface SiteToSiteClientConfig {
* @return
*/
int getPreferredBatchCount();
+
+ /**
+ * Returns the EventReporter that is to be used by clients to report events
+ * @return
+ */
+ EventReporter getEventReporter();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 6869cca..43bc8e5 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -107,12 +107,13 @@ public class EndpointConnectionPool {
private volatile List<PeerStatus> peerStatuses;
private volatile long peerRefreshTime = 0L;
private volatile PeerStatusCache peerStatusCache;
- private final Set<CommunicationsSession> activeCommsChannels = new HashSet<>();
+ private final Set<EndpointConnection> activeConnections = Collections.synchronizedSet(new HashSet<EndpointConnection>());
private final File peersFile;
private final EventReporter eventReporter;
private final SSLContext sslContext;
private final ScheduledExecutorService taskExecutor;
+ private final int idleExpirationMillis;
private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock();
private final Lock remoteInfoReadLock = listeningPortRWLock.readLock();
@@ -124,12 +125,18 @@ public class EndpointConnectionPool {
private final Map<String, String> outputPortMap = new HashMap<>(); // map output port name to identifier
private volatile int commsTimeout;
-
- public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final EventReporter eventReporter, final File persistenceFile) {
- this(clusterUrl, commsTimeoutMillis, null, eventReporter, persistenceFile);
+ private volatile boolean shutdown = false;
+
+
+ public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final int idleExpirationMillis,
+ final EventReporter eventReporter, final File persistenceFile)
+ {
+ this(clusterUrl, commsTimeoutMillis, idleExpirationMillis, null, eventReporter, persistenceFile);
}
- public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) {
+ public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final int idleExpirationMillis,
+ final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile)
+ {
try {
this.clusterUrl = new URI(clusterUrl);
} catch (final URISyntaxException e) {
@@ -147,6 +154,7 @@ public class EndpointConnectionPool {
this.peersFile = persistenceFile;
this.eventReporter = eventReporter;
this.commsTimeout = commsTimeoutMillis;
+ this.idleExpirationMillis = idleExpirationMillis;
Set<PeerStatus> recoveredStatuses;
if ( persistenceFile != null && persistenceFile.exists() ) {
@@ -225,19 +233,21 @@ public class EndpointConnectionPool {
// if we can't get an existing Connection, create one
if ( connection == null ) {
- logger.debug("No Connection available for Port {}; creating new Connection", remoteDestination.getIdentifier());
+ logger.debug("{} No Connection available for Port {}; creating new Connection", this, remoteDestination.getIdentifier());
protocol = new SocketClientProtocol();
protocol.setDestination(remoteDestination);
+ logger.debug("{} getting next peer status", this);
final PeerStatus peerStatus = getNextPeerStatus(direction);
+ logger.debug("{} next peer status = {}", this, peerStatus);
if ( peerStatus == null ) {
return null;
}
try {
+ logger.debug("{} Establishing site-to-site connection with {}", this, peerStatus);
commsSession = establishSiteToSiteConnection(peerStatus);
} catch (final IOException ioe) {
- // TODO: penalize peer status
penalize(peerStatus, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS));
throw ioe;
}
@@ -245,6 +255,7 @@ public class EndpointConnectionPool {
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
try {
+ logger.debug("{} Negotiating protocol", this);
RemoteResourceInitiator.initiateResourceNegotiation(protocol, dis, dos);
} catch (final HandshakeException e) {
try {
@@ -267,6 +278,7 @@ public class EndpointConnectionPool {
// perform handshake
try {
+ logger.debug("{} performing handshake", this);
protocol.handshake(peer);
// handle error cases
@@ -286,7 +298,9 @@ public class EndpointConnectionPool {
}
// negotiate the FlowFileCodec to use
+ logger.debug("{} negotiating codec", this);
codec = protocol.negotiateCodec(peer);
+ logger.debug("{} negotiated codec is {}", this, codec);
} catch (final PortNotRunningException | UnknownPortException e) {
throw e;
} catch (final Exception e) {
@@ -323,6 +337,7 @@ public class EndpointConnectionPool {
}
}
+ activeConnections.add(connection);
return connection;
}
@@ -338,7 +353,14 @@ public class EndpointConnectionPool {
return false;
}
- return connectionQueue.offer(endpointConnection);
+ activeConnections.remove(endpointConnection);
+ if ( shutdown ) {
+ terminate(endpointConnection);
+ return false;
+ } else {
+ endpointConnection.setLastTimeUsed();
+ return connectionQueue.offer(endpointConnection);
+ }
}
private void penalize(final PeerStatus status, final long penalizationMillis) {
@@ -393,27 +415,36 @@ public class EndpointConnectionPool {
}
}
+ private boolean isPeerRefreshNeeded(final List<PeerStatus> peerList) {
+ return (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
+ }
+
private PeerStatus getNextPeerStatus(final TransferDirection direction) {
List<PeerStatus> peerList = peerStatuses;
- if ( (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD) ) {
+ if ( isPeerRefreshNeeded(peerList) ) {
peerRefreshLock.lock();
try {
- try {
- peerList = createPeerStatusList(direction);
- } catch (final Exception e) {
- final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString());
- logger.warn(message);
- if ( logger.isDebugEnabled() ) {
- logger.warn("", e);
+ // now that we have the lock, check again that we need to refresh (because another thread
+ // could have been refreshing while we were waiting for the lock).
+ peerList = peerStatuses;
+ if (isPeerRefreshNeeded(peerList)) {
+ try {
+ peerList = createPeerStatusList(direction);
+ } catch (final Exception e) {
+ final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString());
+ logger.warn(message);
+ if ( logger.isDebugEnabled() ) {
+ logger.warn("", e);
+ }
+
+ if ( eventReporter != null ) {
+ eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
+ }
}
- if ( eventReporter != null ) {
- eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
- }
+ this.peerStatuses = peerList;
+ peerRefreshTime = System.currentTimeMillis();
}
-
- this.peerStatuses = peerList;
- peerRefreshTime = System.currentTimeMillis();
} finally {
peerRefreshLock.unlock();
}
@@ -488,7 +519,10 @@ public class EndpointConnectionPool {
private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException, HandshakeException, UnknownPortException, PortNotRunningException {
final String hostname = clusterUrl.getHost();
- final int port = getSiteToSitePort();
+ final Integer port = getSiteToSitePort();
+ if ( port == null ) {
+ throw new IOException("Remote instance of NiFi is not configured to allow site-to-site communications");
+ }
final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port);
final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, clusterUrl.toString());
@@ -667,7 +701,7 @@ public class EndpointConnectionPool {
distributionDescription.append("New Weighted Distribution of Nodes:");
for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) {
final double percentage = entry.getValue() * 100D / (double) destinations.size();
- distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of FlowFiles");
+ distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of data");
}
logger.info(distributionDescription.toString());
@@ -677,35 +711,36 @@ public class EndpointConnectionPool {
private void cleanupExpiredSockets() {
- final List<EndpointConnection> states = new ArrayList<>();
+ final List<EndpointConnection> connections = new ArrayList<>();
- EndpointConnection state;
- while ((state = connectionQueue.poll()) != null) {
+ EndpointConnection connection;
+ while ((connection = connectionQueue.poll()) != null) {
// If the socket has not been used in 10 seconds, shut it down.
- final long lastUsed = state.getLastTimeUsed();
- if ( lastUsed < System.currentTimeMillis() - 10000L ) {
+ final long lastUsed = connection.getLastTimeUsed();
+ if ( lastUsed < System.currentTimeMillis() - idleExpirationMillis ) {
try {
- state.getSocketClientProtocol().shutdown(state.getPeer());
+ connection.getSocketClientProtocol().shutdown(connection.getPeer());
} catch (final Exception e) {
logger.debug("Failed to shut down {} using {} due to {}",
- new Object[] {state.getSocketClientProtocol(), state.getPeer(), e} );
+ new Object[] {connection.getSocketClientProtocol(), connection.getPeer(), e} );
}
- cleanup(state.getSocketClientProtocol(), state.getPeer());
+ terminate(connection);
} else {
- states.add(state);
+ connections.add(connection);
}
}
- connectionQueue.addAll(states);
+ connectionQueue.addAll(connections);
}
public void shutdown() {
+ shutdown = true;
taskExecutor.shutdown();
peerTimeoutExpirations.clear();
-
- for ( final CommunicationsSession commsSession : activeCommsChannels ) {
- commsSession.interrupt();
+
+ for ( final EndpointConnection conn : activeConnections ) {
+ conn.getPeer().getCommunicationsSession().interrupt();
}
EndpointConnection state;
@@ -714,8 +749,8 @@ public class EndpointConnectionPool {
}
}
- public void terminate(final EndpointConnection state) {
- cleanup(state.getSocketClientProtocol(), state.getPeer());
+ public void terminate(final EndpointConnection connection) {
+ cleanup(connection.getSocketClientProtocol(), connection.getPeer());
}
private void refreshPeers() {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index 6fa934b..aae19b3 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -19,8 +19,10 @@ package org.apache.nifi.remote.client.socket;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
+import org.apache.nifi.remote.Communicant;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransactionCompletion;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
@@ -40,7 +42,8 @@ public class SocketClient implements SiteToSiteClient {
private volatile String portIdentifier;
public SocketClient(final SiteToSiteClientConfig config) {
- pool = new EndpointConnectionPool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS),
+ pool = new EndpointConnectionPool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS),
+ (int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile());
this.config = config;
@@ -130,14 +133,9 @@ public class SocketClient implements SiteToSiteClient {
}
@Override
- public void complete() throws IOException {
- complete(false);
- }
-
- @Override
- public void complete(final boolean requestBackoff) throws IOException {
+ public TransactionCompletion complete() throws IOException {
try {
- transaction.complete(requestBackoff);
+ return transaction.complete();
} finally {
final EndpointConnection state = connectionStateRef.get();
if ( state != null ) {
@@ -187,7 +185,11 @@ public class SocketClient implements SiteToSiteClient {
public TransactionState getState() throws IOException {
return transaction.getState();
}
-
+
+ @Override
+ public Communicant getCommunicant() {
+ return transaction.getCommunicant();
+ }
};
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index e321663..390f4fc 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -317,10 +317,7 @@ public class SocketClientProtocol implements ClientProtocol {
// Commit the session so that we have persisted the data
session.commit();
- // We want to apply backpressure if the outgoing connections are full. I.e., there are no available relationships.
- final boolean applyBackpressure = context.getAvailableRelationships().isEmpty();
-
- transaction.complete(applyBackpressure);
+ transaction.complete();
logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
if ( !flowFilesReceived.isEmpty() ) {
@@ -397,7 +394,7 @@ public class SocketClientProtocol implements ClientProtocol {
final String dataSize = FormatUtils.formatDataSize(bytesSent);
session.commit();
- transaction.complete(false);
+ transaction.complete();
final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index cf8f9b2..b2fffed 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -25,8 +25,10 @@ import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream;
+import org.apache.nifi.remote.Communicant;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.Transaction;
+import org.apache.nifi.remote.TransactionCompletion;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.ProtocolException;
@@ -40,7 +42,7 @@ import org.slf4j.LoggerFactory;
public class SocketClientTransaction implements Transaction {
private static final Logger logger = LoggerFactory.getLogger(SocketClientTransaction.class);
-
+ private final long creationNanoTime = System.nanoTime();
private final CRC32 crc = new CRC32();
private final int protocolVersion;
private final FlowFileCodec codec;
@@ -54,6 +56,7 @@ public class SocketClientTransaction implements Transaction {
private boolean dataAvailable = false;
private int transfers = 0;
+ private long contentBytes = 0;
private TransactionState state;
SocketClientTransaction(final int protocolVersion, final String destinationId, final Peer peer, final FlowFileCodec codec,
@@ -108,54 +111,59 @@ public class SocketClientTransaction implements Transaction {
@Override
public DataPacket receive() throws IOException {
try {
- if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
- throw new IllegalStateException("Cannot receive data because Transaction State is " + state);
- }
-
- if ( direction == TransferDirection.SEND ) {
- throw new IllegalStateException("Attempting to receive data but started a SEND Transaction");
- }
-
- // if we already know there's no data, just return null
- if ( !dataAvailable ) {
- return null;
- }
-
- // if we have already received a packet, check if another is available.
- if ( transfers > 0 ) {
- // Determine if Peer will send us data or has no data to send us
- final Response dataAvailableCode = Response.read(dis);
- switch (dataAvailableCode.getCode()) {
- case CONTINUE_TRANSACTION:
- logger.debug("{} {} Indicates Transaction should continue", this, peer);
- this.dataAvailable = true;
- break;
- case FINISH_TRANSACTION:
- logger.debug("{} {} Indicates Transaction should finish", peer);
- this.dataAvailable = false;
- break;
- default:
- throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+ try {
+ if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
+ throw new IllegalStateException("Cannot receive data because Transaction State is " + state);
+ }
+
+ if ( direction == TransferDirection.SEND ) {
+ throw new IllegalStateException("Attempting to receive data but started a SEND Transaction");
+ }
+
+ // if we already know there's no data, just return null
+ if ( !dataAvailable ) {
+ return null;
+ }
+
+ // if we have already received a packet, check if another is available.
+ if ( transfers > 0 ) {
+ // Determine if Peer will send us data or has no data to send us
+ final Response dataAvailableCode = Response.read(dis);
+ switch (dataAvailableCode.getCode()) {
+ case CONTINUE_TRANSACTION:
+ logger.debug("{} {} Indicates Transaction should continue", this, peer);
+ this.dataAvailable = true;
+ break;
+ case FINISH_TRANSACTION:
+ logger.debug("{} {} Indicates Transaction should finish", peer);
+ this.dataAvailable = false;
+ break;
+ default:
+ throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+ }
}
- }
-
- // if no data available, return null
- if ( !dataAvailable ) {
- return null;
- }
-
- logger.debug("{} Receiving data from {}", this, peer);
- final InputStream dataIn = compress ? new CompressionInputStream(dis) : dis;
- final DataPacket packet = codec.decode(new CheckedInputStream(dataIn, crc));
-
- if ( packet == null ) {
- this.dataAvailable = false;
- } else {
- transfers++;
- }
-
- this.state = TransactionState.DATA_EXCHANGED;
- return packet;
+
+ // if no data available, return null
+ if ( !dataAvailable ) {
+ return null;
+ }
+
+ logger.debug("{} Receiving data from {}", this, peer);
+ final InputStream dataIn = compress ? new CompressionInputStream(dis) : dis;
+ final DataPacket packet = codec.decode(new CheckedInputStream(dataIn, crc));
+
+ if ( packet == null ) {
+ this.dataAvailable = false;
+ } else {
+ transfers++;
+ contentBytes += packet.getSize();
+ }
+
+ this.state = TransactionState.DATA_EXCHANGED;
+ return packet;
+ } catch (final IOException ioe) {
+ throw new IOException("Failed to receive data from " + peer + " due to " + ioe, ioe);
+ }
} catch (final Exception e) {
error();
throw e;
@@ -164,35 +172,40 @@ public class SocketClientTransaction implements Transaction {
@Override
- public void send(DataPacket dataPacket) throws IOException {
+ public void send(final DataPacket dataPacket) throws IOException {
try {
- if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
- throw new IllegalStateException("Cannot send data because Transaction State is " + state);
- }
-
- if ( direction == TransferDirection.RECEIVE ) {
- throw new IllegalStateException("Attempting to send data but started a RECEIVE Transaction");
- }
-
- if ( transfers > 0 ) {
- ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
- }
-
- logger.debug("{} Sending data to {}", this, peer);
-
- final OutputStream dataOut = compress ? new CompressionOutputStream(dos) : dos;
- final OutputStream out = new CheckedOutputStream(dataOut, crc);
- codec.encode(dataPacket, out);
-
- // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
- // Otherwise, do NOT close it because we don't want to close the underlying stream
- // (CompressionOutputStream will not close the underlying stream when it's closed)
- if ( compress ) {
- out.close();
- }
-
- transfers++;
- this.state = TransactionState.DATA_EXCHANGED;
+ try {
+ if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) {
+ throw new IllegalStateException("Cannot send data because Transaction State is " + state);
+ }
+
+ if ( direction == TransferDirection.RECEIVE ) {
+ throw new IllegalStateException("Attempting to send data but started a RECEIVE Transaction");
+ }
+
+ if ( transfers > 0 ) {
+ ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
+ }
+
+ logger.debug("{} Sending data to {}", this, peer);
+
+ final OutputStream dataOut = compress ? new CompressionOutputStream(dos) : dos;
+ final OutputStream out = new CheckedOutputStream(dataOut, crc);
+ codec.encode(dataPacket, out);
+
+ // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
+ // Otherwise, do NOT close it because we don't want to close the underlying stream
+ // (CompressionOutputStream will not close the underlying stream when it's closed)
+ if ( compress ) {
+ out.close();
+ }
+
+ transfers++;
+ contentBytes += dataPacket.getSize();
+ this.state = TransactionState.DATA_EXCHANGED;
+ } catch (final IOException ioe) {
+ throw new IOException("Failed to send data to " + peer + " due to " + ioe, ioe);
+ }
} catch (final Exception e) {
error();
throw e;
@@ -211,59 +224,56 @@ public class SocketClientTransaction implements Transaction {
state = TransactionState.TRANSACTION_CANCELED;
} catch (final IOException ioe) {
error();
- throw ioe;
+ throw new IOException("Failed to send 'cancel transaction' message to " + peer + " due to " + ioe, ioe);
}
}
- @Override
- public void complete() throws IOException {
- complete(false);
- }
@Override
- public void complete(boolean requestBackoff) throws IOException {
+ public TransactionCompletion complete() throws IOException {
try {
- if ( state != TransactionState.TRANSACTION_CONFIRMED ) {
- throw new IllegalStateException("Cannot complete transaction because state is " + state +
- "; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED);
- }
-
- if ( direction == TransferDirection.RECEIVE ) {
- if ( transfers == 0 ) {
- state = TransactionState.TRANSACTION_COMPLETED;
- return;
- }
-
- if ( requestBackoff ) {
- // Confirm that we received the data and the peer can now discard it but that the peer should not
- // send any more data for a bit
- logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
- ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
- } else {
+ try {
+ if ( state != TransactionState.TRANSACTION_CONFIRMED ) {
+ throw new IllegalStateException("Cannot complete transaction because state is " + state +
+ "; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED);
+ }
+
+ boolean backoff = false;
+ if ( direction == TransferDirection.RECEIVE ) {
+ if ( transfers == 0 ) {
+ state = TransactionState.TRANSACTION_COMPLETED;
+ return new SocketClientTransactionCompletion(false, 0, 0L, System.nanoTime() - creationNanoTime);
+ }
+
// Confirm that we received the data and the peer can now discard it
logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
+
+ state = TransactionState.TRANSACTION_COMPLETED;
+ } else {
+ final Response transactionResponse;
+ try {
+ transactionResponse = Response.read(dis);
+ } catch (final IOException e) {
+ throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " +
+ "It is unknown whether or not the peer successfully received/processed the data.", e);
+ }
+
+ logger.debug("{} Received {} from {}", this, transactionResponse, peer);
+ if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
+ peer.penalize(destinationId, penaltyMillis);
+ backoff = true;
+ } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
+ throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
+ }
+
+ state = TransactionState.TRANSACTION_COMPLETED;
}
-
- state = TransactionState.TRANSACTION_COMPLETED;
- } else {
- final Response transactionResponse;
- try {
- transactionResponse = Response.read(dis);
- } catch (final IOException e) {
- throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " +
- "It is unknown whether or not the peer successfully received/processed the data.", e);
- }
-
- logger.debug("{} Received {} from {}", this, transactionResponse, peer);
- if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
- peer.penalize(destinationId, penaltyMillis);
- } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
- throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
- }
-
- state = TransactionState.TRANSACTION_COMPLETED;
- }
+
+ return new SocketClientTransactionCompletion(backoff, transfers, contentBytes, System.nanoTime() - creationNanoTime);
+ } catch (final IOException ioe) {
+ throw new IOException("Failed to complete transaction with " + peer + " due to " + ioe, ioe);
+ }
} catch (final Exception e) {
error();
throw e;
@@ -274,81 +284,85 @@ public class SocketClientTransaction implements Transaction {
@Override
public void confirm() throws IOException {
try {
- if ( state == TransactionState.TRANSACTION_STARTED && !dataAvailable && direction == TransferDirection.RECEIVE ) {
- // client requested to receive data but no data available. no need to confirm.
- state = TransactionState.TRANSACTION_CONFIRMED;
- return;
- }
-
- if ( state != TransactionState.DATA_EXCHANGED ) {
- throw new IllegalStateException("Cannot confirm Transaction because state is " + state +
- "; Transaction can only be confirmed when state is " + TransactionState.DATA_EXCHANGED );
- }
-
- if ( direction == TransferDirection.RECEIVE ) {
- if ( dataAvailable ) {
- throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed.");
- }
-
- // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
- // to peer so that we can verify that the connection is still open. This is a two-phase commit,
- // which helps to prevent the chances of data duplication. Without doing this, we may commit the
- // session and then when we send the response back to the peer, the peer may have timed out and may not
- // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
- // Critical Section involved in this transaction so that rather than the Critical Section being the
- // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
- logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
- final String calculatedCRC = String.valueOf(crc.getValue());
- ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
-
- final Response confirmTransactionResponse;
- try {
- confirmTransactionResponse = Response.read(dis);
- } catch (final IOException ioe) {
- logger.error("Failed to receive response code from {} when expected confirmation of transaction", peer);
- throw ioe;
- }
-
- logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer);
-
- switch (confirmTransactionResponse.getCode()) {
- case CONFIRM_TRANSACTION:
- break;
- case BAD_CHECKSUM:
- throw new IOException(this + " Received a BadChecksum response from peer " + peer);
- default:
- throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
- }
-
- state = TransactionState.TRANSACTION_CONFIRMED;
- } else {
- logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer);
- ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
-
- final String calculatedCRC = String.valueOf(crc.getValue());
-
- // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
- final Response transactionConfirmationResponse = Response.read(dis);
- if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
- // Confirm checksum and echo back the confirmation.
- logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer);
- final String receivedCRC = transactionConfirmationResponse.getMessage();
+ try {
+ if ( state == TransactionState.TRANSACTION_STARTED && !dataAvailable && direction == TransferDirection.RECEIVE ) {
+ // client requested to receive data but no data available. no need to confirm.
+ state = TransactionState.TRANSACTION_CONFIRMED;
+ return;
+ }
+
+ if ( state != TransactionState.DATA_EXCHANGED ) {
+ throw new IllegalStateException("Cannot confirm Transaction because state is " + state +
+ "; Transaction can only be confirmed when state is " + TransactionState.DATA_EXCHANGED );
+ }
+
+ if ( direction == TransferDirection.RECEIVE ) {
+ if ( dataAvailable ) {
+ throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed.");
+ }
- // CRC was not used before version 4
- if ( protocolVersion > 3 ) {
- if ( !receivedCRC.equals(calculatedCRC) ) {
- ResponseCode.BAD_CHECKSUM.writeResponse(dos);
- throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
- }
+ // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
+ // to peer so that we can verify that the connection is still open. This is a two-phase commit,
+ // which helps to prevent the chances of data duplication. Without doing this, we may commit the
+ // session and then when we send the response back to the peer, the peer may have timed out and may not
+ // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
+ // Critical Section involved in this transaction so that rather than the Critical Section being the
+ // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
+ logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
+ final String calculatedCRC = String.valueOf(crc.getValue());
+ ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
+
+ final Response confirmTransactionResponse;
+ try {
+ confirmTransactionResponse = Response.read(dis);
+ } catch (final IOException ioe) {
+ logger.error("Failed to receive response code from {} when expected confirmation of transaction", peer);
+ throw ioe;
}
- ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
+ logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer);
+
+ switch (confirmTransactionResponse.getCode()) {
+ case CONFIRM_TRANSACTION:
+ break;
+ case BAD_CHECKSUM:
+ throw new IOException(this + " Received a BadChecksum response from peer " + peer);
+ default:
+ throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
+ }
+
+ state = TransactionState.TRANSACTION_CONFIRMED;
} else {
- throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
+ logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer);
+ ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
+
+ final String calculatedCRC = String.valueOf(crc.getValue());
+
+ // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
+ final Response transactionConfirmationResponse = Response.read(dis);
+ if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
+ // Confirm checksum and echo back the confirmation.
+ logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer);
+ final String receivedCRC = transactionConfirmationResponse.getMessage();
+
+ // CRC was not used before version 4
+ if ( protocolVersion > 3 ) {
+ if ( !receivedCRC.equals(calculatedCRC) ) {
+ ResponseCode.BAD_CHECKSUM.writeResponse(dos);
+ throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
+ }
+ }
+
+ ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
+ } else {
+ throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
+ }
+
+ state = TransactionState.TRANSACTION_CONFIRMED;
}
-
- state = TransactionState.TRANSACTION_CONFIRMED;
- }
+ } catch (final IOException ioe) {
+ throw new IOException("Failed to confirm transaction with " + peer + " due to " + ioe, ioe);
+ }
} catch (final Exception e) {
error();
throw e;
@@ -365,4 +379,13 @@ public class SocketClientTransaction implements Transaction {
return state;
}
+ @Override
+ public Communicant getCommunicant() {
+ return peer;
+ }
+
+ @Override
+ public String toString() {
+ return "SocketClientTransaction[Url=" + peer.getUrl() + ", TransferDirection=" + direction + ", State=" + state + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java
new file mode 100644
index 0000000..5eb6c91
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransactionCompletion.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.protocol.socket;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.remote.TransactionCompletion;
+
+public class SocketClientTransactionCompletion implements TransactionCompletion {
+
+ private final boolean backoff;
+ private final int dataPacketsTransferred;
+ private final long bytesTransferred;
+ private final long durationNanos;
+
+ public SocketClientTransactionCompletion(final boolean backoff, final int dataPacketsTransferred, final long bytesTransferred, final long durationNanos) {
+ this.backoff = backoff;
+ this.dataPacketsTransferred = dataPacketsTransferred;
+ this.bytesTransferred = bytesTransferred;
+ this.durationNanos = durationNanos;
+ }
+
+ @Override
+ public boolean isBackoff() {
+ return backoff;
+ }
+
+ @Override
+ public int getDataPacketsTransferred() {
+ return dataPacketsTransferred;
+ }
+
+ @Override
+ public long getBytesTransferred() {
+ return bytesTransferred;
+ }
+
+ @Override
+ public long getDuration(final TimeUnit timeUnit) {
+ return timeUnit.convert(durationNanos, TimeUnit.NANOSECONDS);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
index a744905..2fd90f8 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
@@ -35,13 +36,13 @@ import org.junit.Test;
public class TestSiteToSiteClient {
@Test
- @Ignore("For local testing only; not really a unit test but a manual test")
+ //@Ignore("For local testing only; not really a unit test but a manual test")
public void testReceive() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
final SiteToSiteClient client = new SiteToSiteClient.Builder()
.url("http://localhost:8080/nifi")
- .portName("out")
+ .portName("cba")
.requestBatchCount(1)
.build();
@@ -62,7 +63,7 @@ public class TestSiteToSiteClient {
Assert.assertNull(transaction.receive());
transaction.confirm();
- transaction.complete(false);
+ transaction.complete();
} finally {
client.close();
}
@@ -70,13 +71,14 @@ public class TestSiteToSiteClient {
@Test
- @Ignore("For local testing only; not really a unit test but a manual test")
+ //@Ignore("For local testing only; not really a unit test but a manual test")
public void testSend() throws IOException {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG");
final SiteToSiteClient client = new SiteToSiteClient.Builder()
- .url("http://localhost:8080/nifi")
- .portName("in")
+ .url("http://10.0.64.63:8080/nifi")
+ .portName("input")
+ .nodePenalizationPeriod(10, TimeUnit.MILLISECONDS)
.build();
try {
@@ -91,7 +93,7 @@ public class TestSiteToSiteClient {
transaction.send(packet);
transaction.confirm();
- transaction.complete(false);
+ transaction.complete();
} finally {
client.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index 3295956..8a4839b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -22,6 +22,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
@@ -130,7 +131,9 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
final Thread thread = new Thread(new Runnable() {
@Override
public void run() {
- String hostname = socket.getInetAddress().getHostName();
+ LOG.debug("{} Determining URL of connection", this);
+ final InetAddress inetAddress = socket.getInetAddress();
+ String hostname = inetAddress.getHostName();
final int slashIndex = hostname.indexOf("/");
if ( slashIndex == 0 ) {
hostname = hostname.substring(1);
@@ -140,6 +143,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
final int port = socket.getPort();
final String peerUri = "nifi://" + hostname + ":" + port;
+ LOG.debug("{} Connection URL is {}", this, peerUri);
final CommunicationsSession commsSession;
final String dn;
@@ -154,6 +158,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
dn = sslSocketChannel.getDn();
commsSession.setUserDn(dn);
} else {
+ LOG.trace("{} Channel is not secure", this);
commsSession = new SocketChannelCommunicationsSession(socketChannel, peerUri);
dn = null;
}
@@ -306,6 +311,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
}
});
thread.setName("Site-to-Site Worker Thread-" + (threadCount++));
+ LOG.debug("Handing connection to {}", thread);
thread.start();
}
}