You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2015/04/22 23:12:57 UTC

[07/49] incubator-nifi git commit: NIFI-271 checkpoint push because there are so many changes. Long way to go but got through dto library

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
index 2b27de2..b0ce357 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java
@@ -28,22 +28,22 @@ import org.apache.nifi.remote.protocol.ServerProtocol;
 
 public class RemoteResourceFactory extends RemoteResourceInitiator {
 
-	@SuppressWarnings("unchecked")
+    @SuppressWarnings("unchecked")
     public static <T extends FlowFileCodec> T receiveCodecNegotiation(final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
         final String codecName = dis.readUTF();
         final int version = dis.readInt();
-        
+
         final T codec = (T) RemoteResourceManager.createCodec(codecName, version);
         final VersionNegotiator negotiator = codec.getVersionNegotiator();
-        if ( negotiator.isVersionSupported(version) ) {
+        if (negotiator.isVersionSupported(version)) {
             dos.write(RESOURCE_OK);
             dos.flush();
-            
+
             negotiator.setVersion(version);
             return codec;
         } else {
             final Integer preferred = negotiator.getPreferredVersion(version);
-            if ( preferred == null ) {
+            if (preferred == null) {
                 dos.write(ABORT);
                 dos.flush();
                 throw new HandshakeException("Unable to negotiate an acceptable version of the FlowFileCodec " + codecName);
@@ -51,36 +51,36 @@ public class RemoteResourceFactory extends RemoteResourceInitiator {
             dos.write(DIFFERENT_RESOURCE_VERSION);
             dos.writeInt(preferred);
             dos.flush();
-            
+
             return receiveCodecNegotiation(dis, dos);
         }
-	}
-	
-	public static void rejectCodecNegotiation(final DataInputStream dis, final DataOutputStream dos, final String explanation) throws IOException {
-		dis.readUTF();	// read codec name
-		dis.readInt();	// read codec version
-		
-		dos.write(ABORT);
-		dos.writeUTF(explanation);
-		dos.flush();
-	}
-	
-	@SuppressWarnings("unchecked")
+    }
+
+    public static void rejectCodecNegotiation(final DataInputStream dis, final DataOutputStream dos, final String explanation) throws IOException {
+        dis.readUTF();  // read codec name
+        dis.readInt();  // read codec version
+
+        dos.write(ABORT);
+        dos.writeUTF(explanation);
+        dos.flush();
+    }
+
+    @SuppressWarnings("unchecked")
     public static <T extends ClientProtocol> T receiveClientProtocolNegotiation(final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
         final String protocolName = dis.readUTF();
         final int version = dis.readInt();
-        
+
         final T protocol = (T) RemoteResourceManager.createClientProtocol(protocolName);
         final VersionNegotiator negotiator = protocol.getVersionNegotiator();
-        if ( negotiator.isVersionSupported(version) ) {
+        if (negotiator.isVersionSupported(version)) {
             dos.write(RESOURCE_OK);
             dos.flush();
-            
+
             negotiator.setVersion(version);
             return protocol;
         } else {
             final Integer preferred = negotiator.getPreferredVersion(version);
-            if ( preferred == null ) {
+            if (preferred == null) {
                 dos.write(ABORT);
                 dos.flush();
                 throw new HandshakeException("Unable to negotiate an acceptable version of the ClientProtocol " + protocolName);
@@ -88,28 +88,27 @@ public class RemoteResourceFactory extends RemoteResourceInitiator {
             dos.write(DIFFERENT_RESOURCE_VERSION);
             dos.writeInt(preferred);
             dos.flush();
-            
+
             return receiveClientProtocolNegotiation(dis, dos);
         }
     }
-    
-	
-	@SuppressWarnings("unchecked")
+
+    @SuppressWarnings("unchecked")
     public static <T extends ServerProtocol> T receiveServerProtocolNegotiation(final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
-	    final String protocolName = dis.readUTF();
+        final String protocolName = dis.readUTF();
         final int version = dis.readInt();
-        
+
         final T protocol = (T) RemoteResourceManager.createServerProtocol(protocolName);
         final VersionNegotiator negotiator = protocol.getVersionNegotiator();
-        if ( negotiator.isVersionSupported(version) ) {
+        if (negotiator.isVersionSupported(version)) {
             dos.write(RESOURCE_OK);
             dos.flush();
-            
+
             negotiator.setVersion(version);
             return protocol;
         } else {
             final Integer preferred = negotiator.getPreferredVersion(version);
-            if ( preferred == null ) {
+            if (preferred == null) {
                 dos.write(ABORT);
                 dos.flush();
                 throw new HandshakeException("Unable to negotiate an acceptable version of the ServerProtocol " + protocolName);
@@ -117,54 +116,53 @@ public class RemoteResourceFactory extends RemoteResourceInitiator {
             dos.write(DIFFERENT_RESOURCE_VERSION);
             dos.writeInt(preferred);
             dos.flush();
-            
+
             return receiveServerProtocolNegotiation(dis, dos);
         }
     }
-    
-	
-	
-	
-	public static <T extends VersionedRemoteResource> T receiveResourceNegotiation(final Class<T> cls, final DataInputStream dis, final DataOutputStream dos, final Class<?>[] constructorArgClasses, final Object[] constructorArgs) throws IOException, HandshakeException {
-		final String resourceClassName = dis.readUTF();
-		final T resource;
-		try {
+
+    public static <T extends VersionedRemoteResource> T
+        receiveResourceNegotiation(final Class<T> cls, final DataInputStream dis, final DataOutputStream dos, final Class<?>[] constructorArgClasses, final Object[] constructorArgs)
+                throws IOException, HandshakeException {
+        final String resourceClassName = dis.readUTF();
+        final T resource;
+        try {
             @SuppressWarnings("unchecked")
-			final Class<T> resourceClass = (Class<T>) Class.forName(resourceClassName);
-            if ( !cls.isAssignableFrom(resourceClass) ) {
-            	throw new HandshakeException("Expected to negotiate a Versioned Resource of type " + cls.getName() + " but received class name of " + resourceClassName);
+            final Class<T> resourceClass = (Class<T>) Class.forName(resourceClassName);
+            if (!cls.isAssignableFrom(resourceClass)) {
+                throw new HandshakeException("Expected to negotiate a Versioned Resource of type " + cls.getName() + " but received class name of " + resourceClassName);
             }
-            
+
             final Constructor<T> ctr = resourceClass.getConstructor(constructorArgClasses);
             resource = ctr.newInstance(constructorArgs);
         } catch (final Throwable t) {
-        	dos.write(ABORT);
-        	final String errorMsg = "Unable to instantiate Versioned Resource of type " + resourceClassName;
-        	dos.writeUTF(errorMsg);
-        	dos.flush();
-        	throw new HandshakeException(errorMsg);
+            dos.write(ABORT);
+            final String errorMsg = "Unable to instantiate Versioned Resource of type " + resourceClassName;
+            dos.writeUTF(errorMsg);
+            dos.flush();
+            throw new HandshakeException(errorMsg);
         }
-		
+
         final int version = dis.readInt();
         final VersionNegotiator negotiator = resource.getVersionNegotiator();
-        if ( negotiator.isVersionSupported(version) ) {
+        if (negotiator.isVersionSupported(version)) {
             dos.write(RESOURCE_OK);
             dos.flush();
-            
+
             negotiator.setVersion(version);
             return resource;
         } else {
             final Integer preferred = negotiator.getPreferredVersion(version);
-            if ( preferred == null ) {
-            	dos.write(ABORT);
-            	dos.flush();
-            	throw new HandshakeException("Unable to negotiate an acceptable version of the resource " + resourceClassName);
+            if (preferred == null) {
+                dos.write(ABORT);
+                dos.flush();
+                throw new HandshakeException("Unable to negotiate an acceptable version of the resource " + resourceClassName);
             }
             dos.write(DIFFERENT_RESOURCE_VERSION);
             dos.writeInt(preferred);
             dos.flush();
-            
+
             return receiveResourceNegotiation(cls, dis, dos, constructorArgClasses, constructorArgs);
         }
-	}
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java
index f86f066..8bbe7aa 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceManager.java
@@ -34,20 +34,21 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class RemoteResourceManager {
+
     private static final Map<String, Class<? extends FlowFileCodec>> codecClassMap;
     private static final Map<String, Class<? extends ServerProtocol>> desiredServerProtocolClassMap = new ConcurrentHashMap<>();
     private static final Map<String, Class<? extends ClientProtocol>> desiredClientProtocolClassMap = new ConcurrentHashMap<>();
-    
+
     private static final Map<String, Set<Class<? extends ServerProtocol>>> serverProtocolClassMap;
     private static final Map<String, Set<Class<? extends ClientProtocol>>> clientProtocolClassMap;
-    
+
     private static final Logger logger = LoggerFactory.getLogger(RemoteResourceManager.class);
-    
+
     static {
         final Map<String, Class<? extends FlowFileCodec>> codecMap = new HashMap<>();
         final Map<String, Set<Class<? extends ServerProtocol>>> serverProtocolMap = new HashMap<>();
         final Map<String, Set<Class<? extends ClientProtocol>>> clientProtocolMap = new HashMap<>();
-        
+
         // load all of the FlowFileCodecs that we know
         final ClassLoader classLoader = RemoteResourceManager.class.getClassLoader();
         final ServiceLoader<FlowFileCodec> flowFileCodecLoader = ServiceLoader.load(FlowFileCodec.class, classLoader);
@@ -58,12 +59,12 @@ public class RemoteResourceManager {
             final String codecName = codec.getResourceName();
 
             final Class<? extends FlowFileCodec> previousValue = codecMap.put(codecName, clazz);
-            if ( previousValue != null ) {
-                logger.warn("Multiple FlowFileCodec's found with name {}; choosing to use {} in place of {}", 
-                    new Object[] {codecName, clazz.getName(), previousValue.getName()});
+            if (previousValue != null) {
+                logger.warn("Multiple FlowFileCodec's found with name {}; choosing to use {} in place of {}",
+                        new Object[]{codecName, clazz.getName(), previousValue.getName()});
             }
         }
-        
+
         final ServiceLoader<ServerProtocol> serverProtocolLoader = ServiceLoader.load(ServerProtocol.class, classLoader);
         final Iterator<ServerProtocol> serverItr = serverProtocolLoader.iterator();
         while (serverItr.hasNext()) {
@@ -72,14 +73,14 @@ public class RemoteResourceManager {
             final String protocolName = protocol.getResourceName();
 
             Set<Class<? extends ServerProtocol>> classSet = serverProtocolMap.get(protocolName);
-            if ( classSet == null ) {
+            if (classSet == null) {
                 classSet = new HashSet<>();
                 serverProtocolMap.put(protocolName, classSet);
             }
-            
+
             classSet.add(clazz);
         }
-        
+
         final ServiceLoader<ClientProtocol> clientProtocolLoader = ServiceLoader.load(ClientProtocol.class, classLoader);
         final Iterator<ClientProtocol> clientItr = clientProtocolLoader.iterator();
         while (clientItr.hasNext()) {
@@ -88,133 +89,132 @@ public class RemoteResourceManager {
             final String protocolName = protocol.getResourceName();
 
             Set<Class<? extends ClientProtocol>> classSet = clientProtocolMap.get(protocolName);
-            if ( classSet == null ) {
+            if (classSet == null) {
                 classSet = new HashSet<>();
                 clientProtocolMap.put(protocolName, classSet);
             }
-            
+
             classSet.add(clazz);
         }
-        
+
         codecClassMap = Collections.unmodifiableMap(codecMap);
         clientProtocolClassMap = Collections.unmodifiableMap(clientProtocolMap);
         serverProtocolClassMap = Collections.unmodifiableMap(serverProtocolMap);
     }
 
-    
     public static boolean isCodecSupported(final String codecName) {
         return codecClassMap.containsKey(codecName);
     }
-    
+
     public static boolean isCodecSupported(final String codecName, final int version) {
-        if ( !isCodecSupported(codecName) ) {
+        if (!isCodecSupported(codecName)) {
             return false;
         }
-        
+
         final FlowFileCodec codec = createCodec(codecName);
         final VersionNegotiator negotiator = codec.getVersionNegotiator();
         return (negotiator.isVersionSupported(version));
     }
-    
+
     public static FlowFileCodec createCodec(final String codecName, final int version) {
         final FlowFileCodec codec = createCodec(codecName);
         final VersionNegotiator negotiator = codec.getVersionNegotiator();
-        if ( !negotiator.isVersionSupported(version) ) {
+        if (!negotiator.isVersionSupported(version)) {
             throw new IllegalArgumentException("FlowFile Codec " + codecName + " does not support version " + version);
         }
-        
+
         negotiator.setVersion(version);
         return codec;
     }
-    
+
     private static FlowFileCodec createCodec(final String codecName) {
         final Class<? extends FlowFileCodec> codecClass = codecClassMap.get(codecName);
-        if ( codecClass == null ) {
+        if (codecClass == null) {
             throw new IllegalArgumentException("Unknown Codec: " + codecName);
         }
-        
+
         try {
             return codecClass.newInstance();
         } catch (final Exception e) {
             throw new RuntimeException("Unable to instantiate class " + codecClass.getName(), e);
         }
     }
-    
+
     public static Set<String> getSupportedCodecNames() {
         return codecClassMap.keySet();
     }
-    
+
     public static List<Integer> getSupportedVersions(final String codecName) {
         final FlowFileCodec codec = createCodec(codecName);
         return codec.getSupportedVersions();
     }
-    
+
     public static Set<Class<? extends ClientProtocol>> getClientProtocolClasses(final String protocolName) {
         final Set<Class<? extends ClientProtocol>> classes = clientProtocolClassMap.get(protocolName);
-        if ( classes == null ) {
+        if (classes == null) {
             return new HashSet<>();
         }
         return new HashSet<>(classes);
     }
-    
+
     public static Set<Class<? extends ServerProtocol>> getServerProtocolClasses(final String protocolName) {
         final Set<Class<? extends ServerProtocol>> classes = serverProtocolClassMap.get(protocolName);
-        if ( classes == null ) {
+        if (classes == null) {
             return new HashSet<>();
         }
         return new HashSet<>(classes);
     }
-    
+
     public static void setServerProtocolImplementation(final String protocolName, final Class<? extends ServerProtocol> clazz) {
         desiredServerProtocolClassMap.put(protocolName, clazz);
     }
-    
+
     public static void setClientProtocolImplementation(final String protocolName, final Class<? extends ClientProtocol> clazz) {
         desiredClientProtocolClassMap.put(protocolName, clazz);
     }
-    
+
     public static ServerProtocol createServerProtocol(final String protocolName) {
         final Set<Class<? extends ServerProtocol>> classSet = getServerProtocolClasses(protocolName);
-        if ( classSet.isEmpty() ) {
+        if (classSet.isEmpty()) {
             throw new IllegalArgumentException("Unknkown Server Protocol: " + protocolName);
         }
 
         Class<? extends ServerProtocol> desiredClass = desiredServerProtocolClassMap.get(protocolName);
-        if ( desiredClass == null && classSet.size() > 1 ) {
+        if (desiredClass == null && classSet.size() > 1) {
             throw new IllegalStateException("Multiple implementations of Server Protocol " + protocolName + " were found and no preferred implementation has been specified");
         }
-        
-        if ( desiredClass != null && !classSet.contains(desiredClass) ) {
+
+        if (desiredClass != null && !classSet.contains(desiredClass)) {
             throw new IllegalStateException("Desired implementation of Server Protocol " + protocolName + " is set to " + desiredClass + ", but that Protocol is not registered as a Server Protocol");
         }
-        
-        if ( desiredClass == null ) {
+
+        if (desiredClass == null) {
             desiredClass = classSet.iterator().next();
         }
-        
+
         try {
             return desiredClass.newInstance();
         } catch (final Exception e) {
             throw new RuntimeException("Unable to instantiate class " + desiredClass.getName(), e);
-        }        
+        }
     }
-    
+
     public static ClientProtocol createClientProtocol(final String protocolName) {
         final Set<Class<? extends ClientProtocol>> classSet = getClientProtocolClasses(protocolName);
-        if ( classSet.isEmpty() ) {
+        if (classSet.isEmpty()) {
             throw new IllegalArgumentException("Unknkown Client Protocol: " + protocolName);
         }
 
         Class<? extends ClientProtocol> desiredClass = desiredClientProtocolClassMap.get(protocolName);
-        if ( desiredClass == null && classSet.size() > 1 ) {
+        if (desiredClass == null && classSet.size() > 1) {
             throw new IllegalStateException("Multiple implementations of Client Protocol " + protocolName + " were found and no preferred implementation has been specified");
         }
-        
-        if ( desiredClass != null && !classSet.contains(desiredClass) ) {
+
+        if (desiredClass != null && !classSet.contains(desiredClass)) {
             throw new IllegalStateException("Desired implementation of Client Protocol " + protocolName + " is set to " + desiredClass + ", but that Protocol is not registered as a Client Protocol");
         }
-        
-        if ( desiredClass == null ) {
+
+        if (desiredClass == null) {
             desiredClass = classSet.iterator().next();
         }
 
@@ -222,6 +222,6 @@ public class RemoteResourceManager {
             return desiredClass.newInstance();
         } catch (final Exception e) {
             throw new RuntimeException("Unable to instantiate class " + desiredClass.getName(), e);
-        }        
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
index 59e4d0a..6f7b977 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/RemoteSiteListener.java
@@ -30,4 +30,4 @@ public interface RemoteSiteListener {
 
     void stop();
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index 493d1fe..809147e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -49,43 +49,42 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SocketRemoteSiteListener implements RemoteSiteListener {
+
     public static final String DEFAULT_FLOWFILE_PATH = "./";
 
     private final int socketPort;
     private final SSLContext sslContext;
     private final NodeInformant nodeInformant;
     private final AtomicReference<ProcessGroup> rootGroup = new AtomicReference<>();
-    
+
     private final AtomicBoolean stopped = new AtomicBoolean(false);
-    
+
     private static final Logger LOG = LoggerFactory.getLogger(SocketRemoteSiteListener.class);
 
     public SocketRemoteSiteListener(final int socketPort, final SSLContext sslContext) {
         this(socketPort, sslContext, null);
     }
-    
+
     public SocketRemoteSiteListener(final int socketPort, final SSLContext sslContext, final NodeInformant nodeInformant) {
         this.socketPort = socketPort;
         this.sslContext = sslContext;
         this.nodeInformant = nodeInformant;
     }
 
-    
     @Override
     public void setRootGroup(final ProcessGroup rootGroup) {
         this.rootGroup.set(rootGroup);
     }
 
-    
     @Override
     public void start() throws IOException {
         final boolean secure = (sslContext != null);
-        
+
         final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
         serverSocketChannel.configureBlocking(true);
         serverSocketChannel.bind(new InetSocketAddress(socketPort));
         stopped.set(false);
-        
+
         final Thread listenerThread = new Thread(new Runnable() {
             private int threadCount = 0;
 
@@ -95,19 +94,21 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
                     final ProcessGroup processGroup = rootGroup.get();
                     // If nodeInformant is not null, we are in clustered mode, which means that we don't care about
                     // the processGroup.
-                    if ( (nodeInformant == null) && (processGroup == null || (processGroup.getInputPorts().isEmpty() && processGroup.getOutputPorts().isEmpty())) ) {
-                        try { Thread.sleep(2000L); } catch (final Exception e) {}
+                    if ((nodeInformant == null) && (processGroup == null || (processGroup.getInputPorts().isEmpty() && processGroup.getOutputPorts().isEmpty()))) {
+                        try {
+                            Thread.sleep(2000L);
+                        } catch (final Exception e) {
+                        }
                         continue;
                     }
-                    
-                    
+
                     LOG.trace("Accepting Connection...");
                     Socket acceptedSocket = null;
                     try {
                         serverSocketChannel.configureBlocking(false);
                         final ServerSocket serverSocket = serverSocketChannel.socket();
                         serverSocket.setSoTimeout(2000);
-                        while ( !stopped.get() && acceptedSocket == null ) {
+                        while (!stopped.get() && acceptedSocket == null) {
                             try {
                                 acceptedSocket = serverSocket.accept();
                             } catch (final SocketTimeoutException ste) {
@@ -116,14 +117,14 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
                         }
                     } catch (final IOException e) {
                         LOG.error("RemoteSiteListener Unable to accept connection due to {}", e.toString());
-                        if ( LOG.isDebugEnabled() ) {
+                        if (LOG.isDebugEnabled()) {
                             LOG.error("", e);
                         }
                         continue;
                     }
                     LOG.trace("Got connection");
-                    
-                    if ( stopped.get() ) {
+
+                    if (stopped.get()) {
                         return;
                     }
                     final Socket socket = acceptedSocket;
@@ -135,25 +136,25 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
                             final InetAddress inetAddress = socket.getInetAddress();
                             String hostname = inetAddress.getHostName();
                             final int slashIndex = hostname.indexOf("/");
-                            if ( slashIndex == 0 ) {
+                            if (slashIndex == 0) {
                                 hostname = hostname.substring(1);
-                            } else if ( slashIndex > 0 ) {
+                            } else if (slashIndex > 0) {
                                 hostname = hostname.substring(0, slashIndex);
                             }
 
                             final int port = socket.getPort();
                             final String peerUri = "nifi://" + hostname + ":" + port;
                             LOG.debug("{} Connection URL is {}", this, peerUri);
-                            
+
                             final CommunicationsSession commsSession;
                             final String dn;
                             try {
-                                if ( secure ) {
+                                if (secure) {
                                     final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslContext, socketChannel, false);
                                     LOG.trace("Channel is secure; connecting...");
                                     sslSocketChannel.connect();
                                     LOG.trace("Channel connected");
-                                    
+
                                     commsSession = new SSLSocketChannelCommunicationsSession(sslSocketChannel, peerUri);
                                     dn = sslSocketChannel.getDn();
                                     commsSession.setUserDn(dn);
@@ -164,7 +165,7 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
                                 }
                             } catch (final Exception e) {
                                 LOG.error("RemoteSiteListener Unable to accept connection from {} due to {}", socket, e.toString());
-                                if ( LOG.isDebugEnabled() ) {
+                                if (LOG.isDebugEnabled()) {
                                     LOG.error("", e);
                                 }
                                 try {
@@ -173,135 +174,136 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
                                 }
                                 return;
                             }
-                            
+
                             LOG.info("Received connection from {}, User DN: {}", socket.getInetAddress(), dn);
-                            
+
                             final InputStream socketIn;
                             final OutputStream socketOut;
-                            
+
                             try {
                                 socketIn = commsSession.getInput().getInputStream();
                                 socketOut = commsSession.getOutput().getOutputStream();
                             } catch (final IOException e) {
-                            	LOG.error("Connection dropped from {} before any data was transmitted", peerUri);
-                            	try {
-                            		commsSession.close();
-                            	} catch (final IOException ioe) {}
-                            	
-                            	return;
+                                LOG.error("Connection dropped from {} before any data was transmitted", peerUri);
+                                try {
+                                    commsSession.close();
+                                } catch (final IOException ioe) {
+                                }
+
+                                return;
                             }
-                            
+
                             final DataInputStream dis = new DataInputStream(socketIn);
-                        	final DataOutputStream dos = new DataOutputStream(socketOut);
-                        	
-                        	ServerProtocol protocol = null;
-                        	Peer peer = null;
+                            final DataOutputStream dos = new DataOutputStream(socketOut);
+
+                            ServerProtocol protocol = null;
+                            Peer peer = null;
                             try {
-                            	// ensure that we are communicating with another NiFi
+                                // ensure that we are communicating with another NiFi
                                 LOG.debug("Verifying magic bytes...");
-                            	verifyMagicBytes(dis, peerUri);
-
-                            	LOG.debug("Receiving Server Protocol Negotiation");
-                            	protocol = RemoteResourceFactory.receiveServerProtocolNegotiation(dis, dos);
-                            	protocol.setRootProcessGroup(rootGroup.get());
-                          	    protocol.setNodeInformant(nodeInformant);
-                            	
-                          	    final PeerDescription description = new PeerDescription("localhost", getPort(), sslContext != null);
-                            	peer = new Peer(description, commsSession, peerUri, "nifi://localhost:" + getPort());
-                            	LOG.debug("Handshaking....");
-                            	protocol.handshake(peer);
-                            	
-                            	if (!protocol.isHandshakeSuccessful()) {
-                            	    LOG.error("Handshake failed with {}; closing connection", peer);
-                            	    try {
-                            	        peer.close();
-                            	    } catch (final IOException e) {
-                            	        LOG.warn("Failed to close {} due to {}", peer, e);
-                            	    }
-                            	    
-                            	    // no need to shutdown protocol because we failed to perform handshake
-                            	    return;
-                            	}
-                            	
-                            	commsSession.setTimeout((int) protocol.getRequestExpiration());
-                            	
-                            	LOG.info("Successfully negotiated ServerProtocol {} Version {} with {}", new Object[] {
-                            	    protocol.getResourceName(), protocol.getVersionNegotiator().getVersion(), peer});
-                            	
-                        	    try {
-                        	        while (!protocol.isShutdown()) {
-                        	            LOG.trace("Getting Protocol Request Type...");
-                        	            
+                                verifyMagicBytes(dis, peerUri);
+
+                                LOG.debug("Receiving Server Protocol Negotiation");
+                                protocol = RemoteResourceFactory.receiveServerProtocolNegotiation(dis, dos);
+                                protocol.setRootProcessGroup(rootGroup.get());
+                                protocol.setNodeInformant(nodeInformant);
+
+                                final PeerDescription description = new PeerDescription("localhost", getPort(), sslContext != null);
+                                peer = new Peer(description, commsSession, peerUri, "nifi://localhost:" + getPort());
+                                LOG.debug("Handshaking....");
+                                protocol.handshake(peer);
+
+                                if (!protocol.isHandshakeSuccessful()) {
+                                    LOG.error("Handshake failed with {}; closing connection", peer);
+                                    try {
+                                        peer.close();
+                                    } catch (final IOException e) {
+                                        LOG.warn("Failed to close {} due to {}", peer, e);
+                                    }
+
+                                    // no need to shutdown protocol because we failed to perform handshake
+                                    return;
+                                }
+
+                                commsSession.setTimeout((int) protocol.getRequestExpiration());
+
+                                LOG.info("Successfully negotiated ServerProtocol {} Version {} with {}", new Object[]{
+                                    protocol.getResourceName(), protocol.getVersionNegotiator().getVersion(), peer});
+
+                                try {
+                                    while (!protocol.isShutdown()) {
+                                        LOG.trace("Getting Protocol Request Type...");
+
                                         int timeoutCount = 0;
                                         RequestType requestType = null;
-                                        
-                                        while ( requestType == null ) {
+
+                                        while (requestType == null) {
                                             try {
                                                 requestType = protocol.getRequestType(peer);
                                             } catch (final SocketTimeoutException e) {
                                                 // Give the timeout a bit longer (twice as long) to receive the Request Type,
                                                 // in order to attempt to receive more data without shutting down the socket if we don't
                                                 // have to.
-                                                LOG.debug("{} Timed out waiting to receive RequestType using {} with {}", new Object[] {this, protocol, peer});
+                                                LOG.debug("{} Timed out waiting to receive RequestType using {} with {}", new Object[]{this, protocol, peer});
                                                 timeoutCount++;
                                                 requestType = null;
-                                                
-                                                if ( timeoutCount >= 2 ) {
+
+                                                if (timeoutCount >= 2) {
                                                     throw e;
                                                 }
                                             }
                                         }
-                                        
+
                                         LOG.debug("Request type from {} is {}", protocol, requestType);
-                                	    switch (requestType) {
-                                	        case NEGOTIATE_FLOWFILE_CODEC:
-                                	            protocol.negotiateCodec(peer);
-                                	            break;
-                                	        case RECEIVE_FLOWFILES:
-                                	            // peer wants to receive FlowFiles, so we will transfer FlowFiles.
-                                	            protocol.getPort().transferFlowFiles(peer, protocol, new HashMap<String, String>());
-                                	            break;
-                                	        case SEND_FLOWFILES:
-                                	            // Peer wants to send FlowFiles, so we will receive.
+                                        switch (requestType) {
+                                            case NEGOTIATE_FLOWFILE_CODEC:
+                                                protocol.negotiateCodec(peer);
+                                                break;
+                                            case RECEIVE_FLOWFILES:
+                                                // peer wants to receive FlowFiles, so we will transfer FlowFiles.
+                                                protocol.getPort().transferFlowFiles(peer, protocol, new HashMap<String, String>());
+                                                break;
+                                            case SEND_FLOWFILES:
+                                                // Peer wants to send FlowFiles, so we will receive.
                                                 protocol.getPort().receiveFlowFiles(peer, protocol, new HashMap<String, String>());
-                                	            break;
-                                	        case REQUEST_PEER_LIST:
-                                	            protocol.sendPeerList(peer);
-                                	            break;
-                                	        case SHUTDOWN:
-                                	            protocol.shutdown(peer);
-                                	            break;
-                                	    }
-                        	        }
-                        	        LOG.debug("Finished communicating with {} ({})", peer, protocol);
-                        	    } catch (final Exception e) {
-                        	        LOG.error("Unable to communicate with remote instance {} ({}) due to {}; closing connection", peer, protocol, e.toString());
-                        	        if ( LOG.isDebugEnabled() ) {
-                        	            LOG.error("", e);
-                        	        }
-                        	    }
+                                                break;
+                                            case REQUEST_PEER_LIST:
+                                                protocol.sendPeerList(peer);
+                                                break;
+                                            case SHUTDOWN:
+                                                protocol.shutdown(peer);
+                                                break;
+                                        }
+                                    }
+                                    LOG.debug("Finished communicating with {} ({})", peer, protocol);
+                                } catch (final Exception e) {
+                                    LOG.error("Unable to communicate with remote instance {} ({}) due to {}; closing connection", peer, protocol, e.toString());
+                                    if (LOG.isDebugEnabled()) {
+                                        LOG.error("", e);
+                                    }
+                                }
                             } catch (final IOException e) {
                                 LOG.error("Unable to communicate with remote instance {} due to {}; closing connection", peer, e.toString());
-                                if ( LOG.isDebugEnabled() ) {
+                                if (LOG.isDebugEnabled()) {
                                     LOG.error("", e);
                                 }
                             } catch (final Throwable t) {
                                 LOG.error("Handshake failed when communicating with {}; closing connection. Reason for failure: {}", peerUri, t.toString());
-                                if ( LOG.isDebugEnabled() ) {
+                                if (LOG.isDebugEnabled()) {
                                     LOG.error("", t);
                                 }
                             } finally {
                                 LOG.trace("Cleaning up");
                                 try {
-                                    if ( protocol != null && peer != null ) {
+                                    if (protocol != null && peer != null) {
                                         protocol.shutdown(peer);
                                     }
                                 } catch (final Exception protocolException) {
                                     LOG.warn("Failed to shutdown protocol due to {}", protocolException.toString());
                                 }
-                                
+
                                 try {
-                                    if ( peer != null ) {
+                                    if (peer != null) {
                                         peer.close();
                                     }
                                 } catch (final Exception peerException) {
@@ -320,30 +322,30 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
         listenerThread.setName("Site-to-Site Listener");
         listenerThread.start();
     }
-    
+
     @Override
     public int getPort() {
         return socketPort;
     }
-    
+
     @Override
     public void stop() {
         stopped.set(true);
     }
-    
+
     private void verifyMagicBytes(final InputStream in, final String peerDescription) throws IOException, HandshakeException {
         final byte[] receivedMagicBytes = new byte[CommunicationsSession.MAGIC_BYTES.length];
 
         // expect magic bytes
         try {
-            for (int i=0; i < receivedMagicBytes.length; i++) {
+            for (int i = 0; i < receivedMagicBytes.length; i++) {
                 receivedMagicBytes[i] = (byte) in.read();
             }
         } catch (final EOFException e) {
             throw new HandshakeException("Handshake failed (not enough bytes) when communicating with " + peerDescription);
         }
-        
-        if ( !Arrays.equals(CommunicationsSession.MAGIC_BYTES, receivedMagicBytes) ) {
+
+        if (!Arrays.equals(CommunicationsSession.MAGIC_BYTES, receivedMagicBytes)) {
             throw new HandshakeException("Handshake with " + peerDescription + " failed because the Magic Header was not present");
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index eec6ed5..982d9ff 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -56,14 +56,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StandardRemoteGroupPort extends RemoteGroupPort {
+
     private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
     public static final String USER_AGENT = "NiFi-Site-to-Site";
     public static final String CONTENT_TYPE = "application/octet-stream";
-    
+
     public static final int GZIP_COMPRESSION_LEVEL = 1;
-    
+
     private static final String CATEGORY = "Site to Site";
-    
+
     private static final Logger logger = LoggerFactory.getLogger(StandardRemoteGroupPort.class);
     private final RemoteProcessGroup remoteGroup;
     private final AtomicBoolean useCompression = new AtomicBoolean(false);
@@ -71,28 +72,27 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
     private final AtomicBoolean targetRunning = new AtomicBoolean(true);
     private final SSLContext sslContext;
     private final TransferDirection transferDirection;
-    
+
     private final AtomicReference<SiteToSiteClient> clientRef = new AtomicReference<>();
-    
-    
-    public StandardRemoteGroupPort(final String id, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup, 
+
+    public StandardRemoteGroupPort(final String id, final String name, final ProcessGroup processGroup, final RemoteProcessGroup remoteGroup,
             final TransferDirection direction, final ConnectableType type, final SSLContext sslContext, final ProcessScheduler scheduler) {
         // remote group port id needs to be unique but cannot just be the id of the port
         // in the remote group instance. this supports referencing the same remote
         // instance more than once.
         super(id, name, processGroup, type, scheduler);
-        
+
         this.remoteGroup = remoteGroup;
         this.transferDirection = direction;
         this.sslContext = sslContext;
         setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
     }
-    
+
     private static File getPeerPersistenceFile(final String portId) {
         final File stateDir = NiFiProperties.getInstance().getPersistentStateDirectory();
         return new File(stateDir, portId + ".peers");
     }
-    
+
     @Override
     public boolean isTargetRunning() {
         return targetRunning.get();
@@ -101,18 +101,18 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
     public void setTargetRunning(boolean targetRunning) {
         this.targetRunning.set(targetRunning);
     }
-    
+
     @Override
     public boolean isTriggerWhenEmpty() {
         return getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT;
     }
-    
+
     @Override
     public void shutdown() {
-    	super.shutdown();
-        
+        super.shutdown();
+
         final SiteToSiteClient client = clientRef.get();
-        if ( client != null ) {
+        if (client != null) {
             try {
                 client.close();
             } catch (final IOException ioe) {
@@ -120,58 +120,57 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
             }
         }
     }
-    
+
     @Override
     public void onSchedulingStart() {
         super.onSchedulingStart();
-        
+
         final SiteToSiteClient client = new SiteToSiteClient.Builder()
-            .url(remoteGroup.getTargetUri().toString())
-            .portIdentifier(getIdentifier())
-            .sslContext(sslContext)
-            .eventReporter(remoteGroup.getEventReporter())
-            .peerPersistenceFile(getPeerPersistenceFile(getIdentifier()))
-            .build();
+                .url(remoteGroup.getTargetUri().toString())
+                .portIdentifier(getIdentifier())
+                .sslContext(sslContext)
+                .eventReporter(remoteGroup.getEventReporter())
+                .peerPersistenceFile(getPeerPersistenceFile(getIdentifier()))
+                .build();
         clientRef.set(client);
     }
-    
-    
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) {
-        if ( !remoteGroup.isTransmitting() ) {
+        if (!remoteGroup.isTransmitting()) {
             logger.debug("{} {} is not transmitting; will not send/receive", this, remoteGroup);
             return;
         }
 
-        if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && session.getQueueSize().getObjectCount() == 0 ) {
+        if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && session.getQueueSize().getObjectCount() == 0) {
             logger.debug("{} No data to send", this);
             return;
         }
-        
+
         String url = getRemoteProcessGroup().getTargetUri().toString();
-        
+
         // If we are sending data, we need to ensure that we have at least 1 FlowFile to send. Otherwise,
         // we don't want to create a transaction at all.
         final FlowFile firstFlowFile;
-        if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
+        if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) {
             firstFlowFile = session.get();
-            if ( firstFlowFile == null ) {
+            if (firstFlowFile == null) {
                 return;
             }
         } else {
             firstFlowFile = null;
         }
-        
+
         final SiteToSiteClient client = clientRef.get();
         final Transaction transaction;
         try {
-        	transaction = client.createTransaction(transferDirection);
+            transaction = client.createTransaction(transferDirection);
         } catch (final PortNotRunningException e) {
             context.yield();
             this.targetRunning.set(false);
             final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port is not in a valid state", this, url);
             logger.error(message);
-          	session.rollback();
+            session.rollback();
             remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
             return;
         } catch (final UnknownPortException e) {
@@ -179,22 +178,22 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
             this.targetExists.set(false);
             final String message = String.format("%s failed to communicate with %s because the remote instance indicates that the port no longer exists", this, url);
             logger.error(message);
-          	session.rollback();
+            session.rollback();
             remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
             return;
         } catch (final IOException e) {
-        	context.yield();
+            context.yield();
             final String message = String.format("%s failed to communicate with %s due to %s", this, url, e.toString());
             logger.error(message);
-            if ( logger.isDebugEnabled() ) {
+            if (logger.isDebugEnabled()) {
                 logger.error("", e);
             }
-          	session.rollback();
+            session.rollback();
             remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
             return;
         }
-        
-        if ( transaction == null ) {
+
+        if (transaction == null) {
             logger.debug("{} Unable to create transaction to communicate with; all peers must be penalized, so yielding context", this);
             session.rollback();
             context.yield();
@@ -202,11 +201,11 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         }
 
         try {
-            if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT ) {
+            if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) {
                 transferFlowFiles(transaction, context, session, firstFlowFile);
             } else {
                 final int numReceived = receiveFlowFiles(transaction, context, session);
-                if ( numReceived == 0 ) {
+                if (numReceived == 0) {
                     context.yield();
                 }
             }
@@ -215,24 +214,22 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         } catch (final Throwable t) {
             final String message = String.format("%s failed to communicate with remote NiFi instance due to %s", this, t.toString());
             logger.error("{} failed to communicate with remote NiFi instance due to {}", this, t.toString());
-            if ( logger.isDebugEnabled() ) {
+            if (logger.isDebugEnabled()) {
                 logger.error("", t);
             }
-            
+
             remoteGroup.getEventReporter().reportEvent(Severity.ERROR, CATEGORY, message);
             transaction.error();
             session.rollback();
         }
     }
 
-    
     @Override
     public String getYieldPeriod() {
         // delegate yield duration to remote process group
         return remoteGroup.getYieldDuration();
     }
-    
-    
+
     private int transferFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session, FlowFile firstFlowFile) throws IOException, ProtocolException {
         FlowFile flowFile = firstFlowFile;
 
@@ -241,7 +238,7 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
             final long startSendingNanos = System.nanoTime();
             final StopWatch stopWatch = new StopWatch(true);
             long bytesSent = 0L;
-            
+
             final Set<FlowFile> flowFilesSent = new HashSet<>();
             boolean continueTransaction = true;
             while (continueTransaction) {
@@ -255,79 +252,78 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
                         transaction.send(dataPacket);
                     }
                 });
-                
+
                 final long transferNanos = System.nanoTime() - startNanos;
                 final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
-                
+
                 flowFilesSent.add(flowFile);
                 bytesSent += flowFile.getSize();
                 logger.debug("{} Sent {} to {}", this, flowFile, transaction.getCommunicant().getUrl());
-                
+
                 final String transitUri = transaction.getCommunicant().getUrl() + "/" + flowFile.getAttribute(CoreAttributes.UUID.key());
                 session.getProvenanceReporter().send(flowFile, transitUri, "Remote DN=" + userDn, transferMillis, false);
                 session.remove(flowFile);
-                
+
                 final long sendingNanos = System.nanoTime() - startSendingNanos;
-                if ( sendingNanos < BATCH_SEND_NANOS ) { 
+                if (sendingNanos < BATCH_SEND_NANOS) {
                     flowFile = session.get();
                 } else {
                     flowFile = null;
                 }
-                
+
                 continueTransaction = (flowFile != null);
             }
-            
+
             transaction.confirm();
-            
+
             // consume input stream entirely, ignoring its contents. If we
             // don't do this, the Connection will not be returned to the pool
             stopWatch.stop();
             final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
             final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
             final String dataSize = FormatUtils.formatDataSize(bytesSent);
-            
+
             session.commit();
             transaction.complete();
-            
+
             final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
-            logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
+            logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{
                 this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
-            
+
             return flowFilesSent.size();
         } catch (final Exception e) {
             session.rollback();
             throw e;
         }
 
-        
     }
-    
+
     private int receiveFlowFiles(final Transaction transaction, final ProcessContext context, final ProcessSession session) throws IOException, ProtocolException {
         final String userDn = transaction.getCommunicant().getDistinguishedName();
-        
+
         final StopWatch stopWatch = new StopWatch(true);
         final Set<FlowFile> flowFilesReceived = new HashSet<>();
         long bytesReceived = 0L;
-        
+
         while (true) {
             final long start = System.nanoTime();
             final DataPacket dataPacket = transaction.receive();
-            if ( dataPacket == null ) {
+            if (dataPacket == null) {
                 break;
             }
-            
+
             FlowFile flowFile = session.create();
             flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
             flowFile = session.importFrom(dataPacket.getData(), flowFile);
             final long receiveNanos = System.nanoTime() - start;
-            
+
             String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
-            if ( sourceFlowFileIdentifier == null ) {
+            if (sourceFlowFileIdentifier == null) {
                 sourceFlowFileIdentifier = "<Unknown Identifier>";
             }
-            
+
             final String transitUri = transaction.getCommunicant().getUrl() + sourceFlowFileIdentifier;
-            session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, 
+            session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier,
                     "Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos));
 
             session.transfer(flowFile, Relationship.ANONYMOUS);
@@ -336,22 +332,22 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
 
         // Confirm that what we received was the correct data.
         transaction.confirm();
-        
+
         // Commit the session so that we have persisted the data
         session.commit();
 
         transaction.complete();
 
-        if ( !flowFilesReceived.isEmpty() ) {
+        if (!flowFilesReceived.isEmpty()) {
             stopWatch.stop();
             final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
             final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
             final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
             final String dataSize = FormatUtils.formatDataSize(bytesReceived);
-            logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] { 
-                    this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate });
+            logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{
+                this, flowFileDescription, dataSize, transaction.getCommunicant().getUrl(), uploadMillis, uploadDataRate});
         }
-        
+
         return flowFilesReceived.size();
     }
 
@@ -371,44 +367,44 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         ValidationResult error = null;
         if (!targetExists.get()) {
             error = new ValidationResult.Builder()
-                .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName()))
-                .subject(String.format("Remote port '%s'", getName()))
-                .valid(false)
-                .build();
-        } else if ( getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty() ) {
+                    .explanation(String.format("Remote instance indicates that port '%s' no longer exists.", getName()))
+                    .subject(String.format("Remote port '%s'", getName()))
+                    .valid(false)
+                    .build();
+        } else if (getConnectableType() == ConnectableType.REMOTE_OUTPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) {
             error = new ValidationResult.Builder()
-                .explanation(String.format("Port '%s' has no outbound connections", getName()))
-                .subject(String.format("Remote port '%s'", getName()))
-                .valid(false)
-                .build();
+                    .explanation(String.format("Port '%s' has no outbound connections", getName()))
+                    .subject(String.format("Remote port '%s'", getName()))
+                    .valid(false)
+                    .build();
         }
-        
-        if ( error != null ) {
+
+        if (error != null) {
             validationErrors.add(error);
         }
-        
+
         return validationErrors;
     }
-    
+
     @Override
     public void verifyCanStart() {
         super.verifyCanStart();
-        
-        if ( getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && getIncomingConnections().isEmpty() ) {
+
+        if (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT && getIncomingConnections().isEmpty()) {
             throw new IllegalStateException("Port " + getName() + " has no incoming connections");
         }
     }
-    
+
     @Override
     public void setUseCompression(final boolean useCompression) {
         this.useCompression.set(useCompression);
     }
-    
+
     @Override
     public boolean isUseCompression() {
         return useCompression.get();
     }
-    
+
     @Override
     public String toString() {
         return "RemoteGroupPort[name=" + getName() + ",target=" + remoteGroup.getTargetUri().toString() + "]";
@@ -418,34 +414,32 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
     public RemoteProcessGroup getRemoteProcessGroup() {
         return remoteGroup;
     }
-    
+
     @Override
     public TransferDirection getTransferDirection() {
         return (getConnectableType() == ConnectableType.REMOTE_INPUT_PORT) ? TransferDirection.SEND : TransferDirection.RECEIVE;
     }
-    
+
     public void setTargetExists(final boolean exists) {
         this.targetExists.set(exists);
     }
-    
+
     @Override
     public void removeConnection(final Connection connection) throws IllegalArgumentException, IllegalStateException {
         super.removeConnection(connection);
-        
-        // If the Port no longer exists on the remote instance and this is the last Connection, tell 
+
+        // If the Port no longer exists on the remote instance and this is the last Connection, tell
         // RemoteProcessGroup to remove me
-        if ( !getTargetExists() && !hasIncomingConnection() && getConnections().isEmpty() ) {
+        if (!getTargetExists() && !hasIncomingConnection() && getConnections().isEmpty()) {
             remoteGroup.removeNonExistentPort(this);
         }
     }
-    
-    
+
     @Override
     public SchedulingStrategy getSchedulingStrategy() {
         return SchedulingStrategy.TIMER_DRIVEN;
     }
-    
-    
+
     @Override
     public boolean isSideEffectFree() {
         return false;