You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/23 20:57:05 UTC
[12/29] incubator-nifi git commit: Refactored client and add javadocs
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 1e33e1f..da9d027 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -18,6 +18,7 @@ package org.apache.nifi.remote;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -25,8 +26,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLContext;
@@ -34,28 +33,30 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.remote.client.socket.EndpointConnection;
-import org.apache.nifi.remote.client.socket.EndpointConnectionPool;
-import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException;
-import org.apache.nifi.remote.protocol.ClientProtocol;
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StandardRemoteGroupPort extends RemoteGroupPort {
+ private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
public static final String USER_AGENT = "NiFi-Site-to-Site";
public static final String CONTENT_TYPE = "application/octet-stream";
@@ -71,11 +72,8 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
private final SSLContext sslContext;
private final TransferDirection transferDirection;
- private final AtomicReference<EndpointConnectionPool> connectionPoolRef = new AtomicReference<>();
+ private final AtomicReference<SiteToSiteClient> clientRef = new AtomicReference<>();
- private final Set<CommunicationsSession> activeCommsChannels = new HashSet<>();
- private final Lock interruptLock = new ReentrantLock();
- private boolean shutdown = false; // guarded by codecLock
public StandardRemoteGroupPort(final String id, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup,
final TransferDirection direction, final ConnectableType type, final SSLContext sslContext, final ProcessScheduler scheduler) {
@@ -112,16 +110,14 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
@Override
public void shutdown() {
super.shutdown();
- interruptLock.lock();
- try {
- this.shutdown = true;
- } finally {
- interruptLock.unlock();
- }
- final EndpointConnectionPool pool = connectionPoolRef.get();
- if ( pool != null ) {
- pool.shutdown();
+ final SiteToSiteClient client = clientRef.get();
+ if ( client != null ) {
+ try {
+ client.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to properly shutdown Site-to-Site Client due to {}", ioe);
+ }
}
}
@@ -129,17 +125,14 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
public void onSchedulingStart() {
super.onSchedulingStart();
- interruptLock.lock();
- try {
- this.shutdown = false;
- } finally {
- interruptLock.unlock();
- }
-
- final EndpointConnectionPool connectionPool = new EndpointConnectionPool(remoteGroup.getTargetUri().toString(),
- remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS),
- sslContext, remoteGroup.getEventReporter(), getPeerPersistenceFile(getIdentifier()));
- connectionPoolRef.set(connectionPool);
+ final SiteToSiteClient client = new SiteToSiteClient.Builder()
+ .url(remoteGroup.getTargetUri().toString())
+ .portIdentifier(getIdentifier())
+ .sslContext(sslContext)
+ .eventReporter(remoteGroup.getEventReporter())
+ .peerPersistenceFile(getPeerPersistenceFile(getIdentifier()))
+ .build();
+ clientRef.set(client);
}
@@ -157,10 +150,10 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
String url = getRemoteProcessGroup().getTargetUri().toString();
- final EndpointConnectionPool connectionPool = connectionPoolRef.get();
- final EndpointConnection connection;
+ final SiteToSiteClient client = clientRef.get();
+ final Transaction transaction;
try {
- connection = connectionPool.getEndpointConnection(this, transferDirection);
+ transaction = client.createTransaction(transferDirection);
} catch (final PortNotRunningException e) {
context.yield();
this.targetRunning.set(false);
@@ -186,95 +179,36 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
return;
}
- if ( connection == null ) {
- logger.debug("{} Unable to determine the next peer to communicate with; all peers must be penalized, so yielding context", this);
+ if ( transaction == null ) {
+ logger.debug("{} Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this);
context.yield();
return;
}
-
- FlowFileCodec codec = connection.getCodec();
- SocketClientProtocol protocol = connection.getSocketClientProtocol();
- final Peer peer = connection.getPeer();
- url = peer.getUrl();
-
+
try {
- interruptLock.lock();
- try {
- if ( shutdown ) {
- peer.getCommunicationsSession().interrupt();
- }
-
- activeCommsChannels.add(peer.getCommunicationsSession());
- } finally {
- interruptLock.unlock();
- }
-
if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
- transferFlowFiles(peer, protocol, context, session, codec);
+ transferFlowFiles(transaction, context, session);
} else {
- final int numReceived = receiveFlowFiles(peer, protocol, context, session, codec);
+ final int numReceived = receiveFlowFiles(transaction, context, session);
if ( numReceived == 0 ) {
context.yield();
}
}
- interruptLock.lock();
- try {
- if ( shutdown ) {
- peer.getCommunicationsSession().interrupt();
- }
-
- activeCommsChannels.remove(peer.getCommunicationsSession());
- } finally {
- interruptLock.unlock();
- }
-
session.commit();
-
- connection.setLastTimeUsed();
- connectionPool.offer(connection);
- } catch (final TransmissionDisabledException e) {
- cleanup(protocol, peer);
- session.rollback();
} catch (final Exception e) {
- connectionPool.penalize(peer, getYieldPeriod(TimeUnit.MILLISECONDS));
-
- final String message = String.format("%s failed to communicate with %s (%s) due to %s", this, peer == null ? url : peer, protocol, e.toString());
- logger.error(message);
+ final String message = String.format("%s failed to communicate with remote NiFi instance due to %s", this, e.toString());
+ logger.error("{} failed to communicate with remote NiFi instance due to {}", this, e.toString());
if ( logger.isDebugEnabled() ) {
logger.error("", e);
}
- cleanup(protocol, peer);
-
remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
session.rollback();
}
}
- private void cleanup(final SocketClientProtocol protocol, final Peer peer) {
- if ( protocol != null && peer != null ) {
- try {
- protocol.shutdown(peer);
- } catch (final TransmissionDisabledException e) {
- // User disabled transmission.... do nothing.
- logger.debug(this + " Transmission Disabled by User");
- } catch (IOException e1) {
- }
- }
-
- if ( peer != null ) {
- try {
- peer.close();
- } catch (final TransmissionDisabledException e) {
- // User disabled transmission.... do nothing.
- logger.debug(this + " Transmission Disabled by User");
- } catch (IOException e1) {
- }
- }
- }
-
@Override
public String getYieldPeriod() {
// delegate yield duration to remote process group
@@ -282,12 +216,129 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
}
- private int transferFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
- return protocol.transferFlowFiles(peer, context, session, codec);
+ private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session) throws IOException, ProtocolException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return 0;
+ }
+
+ try {
+ final String userDn = transaction.getCommunicant().getDistinguishedName();
+ final long startSendingNanos = System.nanoTime();
+ final StopWatch stopWatch = new StopWatch(true);
+ long bytesSent = 0L;
+
+ final Set<FlowFile> flowFilesSent = new HashSet<>();
+ boolean continueTransaction = true;
+ while (continueTransaction) {
+ final long startNanos = System.nanoTime();
+ // call codec.encode within a session callback so that we have the InputStream to read the FlowFile
+ final FlowFile toWrap = flowFile;
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ final DataPacket dataPacket = new StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize());
+ transaction.send(dataPacket);
+ }
+ });
+
+ final long transferNanos = System.nanoTime() - startNanos;
+ final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+
+ flowFilesSent.add(flowFile);
+ bytesSent += flowFile.getSize();
+ logger.debug("{} Sent {} to {}", this, flowFile, transaction.getCommunicant().getUrl());
+
+ final String transitUri = transaction.getCommunicant().getUrl() + "/" + flowFile.getAttribute(CoreAttributes.UUID.key());
+ session.getProvenanceReporter().send(flowFile, transitUri, "Remote DN=" + userDn, transferMillis, false);
+ session.remove(flowFile);
+
+ final long sendingNanos = System.nanoTime() - startSendingNanos;
+ if ( sendingNanos < BATCH_SEND_NANOS ) {
+ flowFile = session.get();
+ } else {
+ flowFile = null;
+ }
+
+ continueTransaction = (flowFile != null);
+ }
+
+ transaction.confirm();
+
+ // consume input stream entirely, ignoring its contents. If we
+ // don't do this, the Connection will not be returned to the pool
+ stopWatch.stop();
+ final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
+ final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ final String dataSize = FormatUtils.formatDataSize(bytesSent);
+
+ session.commit();
+ transaction.complete();
+
+ final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+ logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
+ this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
+
+ return flowFilesSent.size();
+ } catch (final Exception e) {
+ session.rollback();
+ throw e;
+ }
+
+
}
- private int receiveFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
- return protocol.receiveFlowFiles(peer, context, session, codec);
+ private int receiveFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session) throws IOException, ProtocolException {
+ final String userDn = transaction.getCommunicant().getDistinguishedName();
+
+ final StopWatch stopWatch = new StopWatch(true);
+ final Set<FlowFile> flowFilesReceived = new HashSet<>();
+ long bytesReceived = 0L;
+
+ while (true) {
+ final long start = System.nanoTime();
+ final DataPacket dataPacket = transaction.receive();
+ if ( dataPacket == null ) {
+ break;
+ }
+
+ FlowFile flowFile = session.create();
+ flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
+ flowFile = session.importFrom(dataPacket.getData(), flowFile);
+ final long receiveNanos = System.nanoTime() - start;
+
+ String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
+ if ( sourceFlowFileIdentifier == null ) {
+ sourceFlowFileIdentifier = "<Unknown Identifier>";
+ }
+
+ final String transitUri = transaction.getCommunicant().getUrl() + sourceFlowFileIdentifier;
+ session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier,
+ "Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos));
+
+ session.transfer(flowFile, Relationship.ANONYMOUS);
+ bytesReceived += dataPacket.getSize();
+ }
+
+ // Confirm that what we received was the correct data.
+ transaction.confirm();
+
+ // Commit the session so that we have persisted the data
+ session.commit();
+
+ transaction.complete();
+
+ if ( !flowFilesReceived.isEmpty() ) {
+ stopWatch.stop();
+ final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
+ final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
+ final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ final String dataSize = FormatUtils.formatDataSize(bytesReceived);
+ logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] {
+ this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate });
+ }
+
+ return flowFilesReceived.size();
}
@Override