You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/02/26 04:54:15 UTC

[12/51] [abbrv] incubator-nifi git commit: Refactored client and add javadocs

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d1e058cd/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index 1e33e1f..da9d027 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -18,6 +18,7 @@ package org.apache.nifi.remote;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -25,8 +26,6 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 import javax.net.ssl.SSLContext;
 
@@ -34,28 +33,30 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.remote.client.socket.EndpointConnection;
-import org.apache.nifi.remote.client.socket.EndpointConnectionPool;
-import org.apache.nifi.remote.codec.FlowFileCodec;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.remote.client.SiteToSiteClient;
 import org.apache.nifi.remote.exception.PortNotRunningException;
 import org.apache.nifi.remote.exception.ProtocolException;
-import org.apache.nifi.remote.exception.TransmissionDisabledException;
 import org.apache.nifi.remote.exception.UnknownPortException;
-import org.apache.nifi.remote.protocol.ClientProtocol;
-import org.apache.nifi.remote.protocol.CommunicationsSession;
-import org.apache.nifi.remote.protocol.socket.SocketClientProtocol;
+import org.apache.nifi.remote.protocol.DataPacket;
+import org.apache.nifi.remote.util.StandardDataPacket;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StandardRemoteGroupPort extends RemoteGroupPort {
+    private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
     public static final String USER_AGENT = "NiFi-Site-to-Site";
     public static final String CONTENT_TYPE = "application/octet-stream";
     
@@ -71,11 +72,8 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
     private final SSLContext sslContext;
     private final TransferDirection transferDirection;
     
-    private final AtomicReference<EndpointConnectionPool> connectionPoolRef = new AtomicReference<>();
+    private final AtomicReference<SiteToSiteClient> clientRef = new AtomicReference<>();
     
-    private final Set<CommunicationsSession> activeCommsChannels = new HashSet<>();
-    private final Lock interruptLock = new ReentrantLock();
-    private boolean shutdown = false;   // guarded by codecLock
     
     public StandardRemoteGroupPort(final String id, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup, 
             final TransferDirection direction, final ConnectableType type, final SSLContext sslContext, final ProcessScheduler scheduler) {
@@ -112,16 +110,14 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
     @Override
     public void shutdown() {
     	super.shutdown();
-        interruptLock.lock();
-        try {
-            this.shutdown = true;
-        } finally {
-            interruptLock.unlock();
-        }
         
-        final EndpointConnectionPool pool = connectionPoolRef.get();
-        if ( pool != null ) {
-            pool.shutdown();
+        final SiteToSiteClient client = clientRef.get();
+        if ( client != null ) {
+            try {
+                client.close();
+            } catch (final IOException ioe) {
+                logger.warn("Failed to properly shutdown Site-to-Site Client due to {}", ioe);
+            }
         }
     }
     
@@ -129,17 +125,14 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
     public void onSchedulingStart() {
         super.onSchedulingStart();
         
-        interruptLock.lock();
-        try {
-            this.shutdown = false;
-        } finally {
-            interruptLock.unlock();
-        }
-        
-        final EndpointConnectionPool connectionPool = new EndpointConnectionPool(remoteGroup.getTargetUri().toString(), 
-                remoteGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), 
-                sslContext, remoteGroup.getEventReporter(), getPeerPersistenceFile(getIdentifier()));
-        connectionPoolRef.set(connectionPool);
+        final SiteToSiteClient client = new SiteToSiteClient.Builder()
+            .url(remoteGroup.getTargetUri().toString())
+            .portIdentifier(getIdentifier())
+            .sslContext(sslContext)
+            .eventReporter(remoteGroup.getEventReporter())
+            .peerPersistenceFile(getPeerPersistenceFile(getIdentifier()))
+            .build();
+        clientRef.set(client);
     }
     
     
@@ -157,10 +150,10 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         
         String url = getRemoteProcessGroup().getTargetUri().toString();
         
-        final EndpointConnectionPool connectionPool = connectionPoolRef.get();
-        final EndpointConnection connection;
+        final SiteToSiteClient client = clientRef.get();
+        final Transaction transaction;
         try {
-        	connection = connectionPool.getEndpointConnection(this, transferDirection);
+        	transaction = client.createTransaction(transferDirection);
         } catch (final PortNotRunningException e) {
             context.yield();
             this.targetRunning.set(false);
@@ -186,95 +179,36 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
             return;
         }
         
-        if ( connection == null ) {
-            logger.debug("{} Unable to determine the next peer to communicate with; all peers must be penalized, so yielding context", this);
+        if ( transaction == null ) {
+            logger.debug("{} Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this);
             context.yield();
             return;
         }
-        
-        FlowFileCodec codec = connection.getCodec();
-        SocketClientProtocol protocol = connection.getSocketClientProtocol();
-        final Peer peer = connection.getPeer();
-        url = peer.getUrl();
-        
+
         try {
-            interruptLock.lock();
-            try {
-                if ( shutdown ) {
-                    peer.getCommunicationsSession().interrupt();
-                }
-                
-                activeCommsChannels.add(peer.getCommunicationsSession());
-            } finally {
-                interruptLock.unlock();
-            }
-            
             if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
-                transferFlowFiles(peer, protocol, context, session, codec);
+                transferFlowFiles(transaction, context, session);
             } else {
-                final int numReceived = receiveFlowFiles(peer, protocol, context, session, codec);
+                final int numReceived = receiveFlowFiles(transaction, context, session);
                 if ( numReceived == 0 ) {
                     context.yield();
                 }
             }
 
-            interruptLock.lock();
-            try {
-                if ( shutdown ) {
-                    peer.getCommunicationsSession().interrupt();
-                }
-                
-                activeCommsChannels.remove(peer.getCommunicationsSession());
-            } finally {
-                interruptLock.unlock();
-            }
-
             session.commit();
-            
-            connection.setLastTimeUsed();
-            connectionPool.offer(connection);
-        } catch (final TransmissionDisabledException e) {
-            cleanup(protocol, peer);
-            session.rollback();
         } catch (final Exception e) {
-            connectionPool.penalize(peer, getYieldPeriod(TimeUnit.MILLISECONDS));
-
-            final String message = String.format("%s failed to communicate with %s (%s) due to %s", this, peer == null ? url : peer, protocol, e.toString());
-            logger.error(message);
+            final String message = String.format("%s failed to communicate with remote NiFi instance due to %s", this, e.toString());
+            logger.error("{} failed to communicate with remote NiFi instance due to {}", this, e.toString());
             if ( logger.isDebugEnabled() ) {
                 logger.error("", e);
             }
             
-            cleanup(protocol, peer);
-            
             remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
             session.rollback();
         }
     }
 
     
-    private void cleanup(final SocketClientProtocol protocol, final Peer peer) {
-        if ( protocol != null && peer != null ) {
-            try {
-                protocol.shutdown(peer);
-            } catch (final TransmissionDisabledException e) {
-                // User disabled transmission.... do nothing.
-                logger.debug(this + " Transmission Disabled by User");
-            } catch (IOException e1) {
-            }
-        }
-        
-        if ( peer != null ) {
-            try {
-                peer.close();
-            } catch (final TransmissionDisabledException e) {
-                // User disabled transmission.... do nothing.
-                logger.debug(this + " Transmission Disabled by User");
-            } catch (IOException e1) {
-            }
-        }
-    }
-    
     @Override
     public String getYieldPeriod() {
         // delegate yield duration to remote process group
@@ -282,12 +216,129 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
     }
     
     
-    private int transferFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
-        return protocol.transferFlowFiles(peer, context, session, codec);
+    private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session) throws IOException, ProtocolException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return 0;
+        }
+
+        try {
+            final String userDn = transaction.getCommunicant().getDistinguishedName();
+            final long startSendingNanos = System.nanoTime();
+            final StopWatch stopWatch = new StopWatch(true);
+            long bytesSent = 0L;
+            
+            final Set<FlowFile> flowFilesSent = new HashSet<>();
+            boolean continueTransaction = true;
+            while (continueTransaction) {
+                final long startNanos = System.nanoTime();
+                // call codec.encode within a session callback so that we have the InputStream to read the FlowFile
+                final FlowFile toWrap = flowFile;
+                session.read(flowFile, new InputStreamCallback() {
+                    @Override
+                    public void process(final InputStream in) throws IOException {
+                        final DataPacket dataPacket = new StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize());
+                        transaction.send(dataPacket);
+                    }
+                });
+                
+                final long transferNanos = System.nanoTime() - startNanos;
+                final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+                
+                flowFilesSent.add(flowFile);
+                bytesSent += flowFile.getSize();
+                logger.debug("{} Sent {} to {}", this, flowFile, transaction.getCommunicant().getUrl());
+                
+                final String transitUri = transaction.getCommunicant().getUrl() + "/" + flowFile.getAttribute(CoreAttributes.UUID.key());
+                session.getProvenanceReporter().send(flowFile, transitUri, "Remote DN=" + userDn, transferMillis, false);
+                session.remove(flowFile);
+                
+                final long sendingNanos = System.nanoTime() - startSendingNanos;
+                if ( sendingNanos < BATCH_SEND_NANOS ) { 
+                    flowFile = session.get();
+                } else {
+                    flowFile = null;
+                }
+                
+                continueTransaction = (flowFile != null);
+            }
+            
+            transaction.confirm();
+            
+            // consume input stream entirely, ignoring its contents. If we
+            // don't do this, the Connection will not be returned to the pool
+            stopWatch.stop();
+            final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
+            final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+            final String dataSize = FormatUtils.formatDataSize(bytesSent);
+            
+            session.commit();
+            transaction.complete();
+            
+            final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+            logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
+                this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
+            
+            return flowFilesSent.size();
+        } catch (final Exception e) {
+            session.rollback();
+            throw e;
+        }
+
+        
     }
     
-    private int receiveFlowFiles(final Peer peer, final ClientProtocol protocol, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
-        return protocol.receiveFlowFiles(peer, context, session, codec);
+    private int receiveFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session) throws IOException, ProtocolException {
+        final String userDn = transaction.getCommunicant().getDistinguishedName();
+        
+        final StopWatch stopWatch = new StopWatch(true);
+        final Set<FlowFile> flowFilesReceived = new HashSet<>();
+        long bytesReceived = 0L;
+        
+        while (true) {
+            final long start = System.nanoTime();
+            final DataPacket dataPacket = transaction.receive();
+            if ( dataPacket == null ) {
+                break;
+            }
+            
+            FlowFile flowFile = session.create();
+            flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
+            flowFile = session.importFrom(dataPacket.getData(), flowFile);
+            final long receiveNanos = System.nanoTime() - start;
+            
+            String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
+            if ( sourceFlowFileIdentifier == null ) {
+                sourceFlowFileIdentifier = "<Unknown Identifier>";
+            }
+            
+            final String transitUri = transaction.getCommunicant().getUrl() + sourceFlowFileIdentifier;
+            session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, 
+                    "Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos));
+
+            session.transfer(flowFile, Relationship.ANONYMOUS);
+            bytesReceived += dataPacket.getSize();
+        }
+
+        // Confirm that what we received was the correct data.
+        transaction.confirm();
+        
+        // Commit the session so that we have persisted the data
+        session.commit();
+
+        transaction.complete();
+
+        if ( !flowFilesReceived.isEmpty() ) {
+            stopWatch.stop();
+            final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
+            final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
+            final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+            final String dataSize = FormatUtils.formatDataSize(bytesReceived);
+            logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] { 
+                    this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate });
+        }
+        
+        return flowFilesReceived.size();
     }
 
     @Override