You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/04/21 08:18:48 UTC

[02/12] incubator-nifi git commit: NIFI-271 checkpoint push because there are so many changes. Long way to go but got through dto library

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
index 021531f..67f28d2 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
@@ -68,10 +68,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class StandardRootGroupPort extends AbstractPort implements RootGroupPort {
+
     private static final String CATEGORY = "Site to Site";
-    
+
     private static final Logger logger = LoggerFactory.getLogger(StandardRootGroupPort.class);
-    
+
     private final AtomicReference<Set<String>> groupAccessControl = new AtomicReference<Set<String>>(new HashSet<String>());
     private final AtomicReference<Set<String>> userAccessControl = new AtomicReference<Set<String>>(new HashSet<String>());
     private final ProcessScheduler processScheduler;
@@ -82,18 +83,18 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
     private final EventReporter eventReporter;
     private final ProcessScheduler scheduler;
     private final Set<Relationship> relationships;
-    
+
     private final BlockingQueue<FlowFileRequest> requestQueue = new ArrayBlockingQueue<>(1000);
-    
+
     private final Set<FlowFileRequest> activeRequests = new HashSet<>();
     private final Lock requestLock = new ReentrantLock();
     private boolean shutdown = false;   // guarded by requestLock
 
-    public StandardRootGroupPort(final String id, final String name, final ProcessGroup processGroup, 
+    public StandardRootGroupPort(final String id, final String name, final ProcessGroup processGroup,
             final TransferDirection direction, final ConnectableType type, final UserService userService,
             final BulletinRepository bulletinRepository, final ProcessScheduler scheduler, final boolean secure) {
         super(id, name, processGroup, type, scheduler);
-        
+
         this.processScheduler = scheduler;
         setScheduldingPeriod(MINIMUM_SCHEDULING_NANOS + " nanos");
         this.userService = userService;
@@ -110,20 +111,20 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
                 bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, sourceName, category, severity.name(), message));
             }
         };
-        
+
         relationships = direction == TransferDirection.RECEIVE ? Collections.singleton(AbstractPort.PORT_RELATIONSHIP) : Collections.<Relationship>emptySet();
     }
-    
+
     @Override
     public Collection<Relationship> getRelationships() {
-    	return relationships;
+        return relationships;
     }
-    
+
     @Override
     public boolean isTriggerWhenEmpty() {
         return true;
     }
-    
+
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
         final FlowFileRequest flowFileRequest;
@@ -132,29 +133,29 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
         } catch (final InterruptedException ie) {
             return;
         }
-        
-        if ( flowFileRequest == null ) {
+
+        if (flowFileRequest == null) {
             return;
         }
 
         flowFileRequest.setServiceBegin();
-        
+
         requestLock.lock();
         try {
-            if ( shutdown ) {
+            if (shutdown) {
                 final CommunicationsSession commsSession = flowFileRequest.getPeer().getCommunicationsSession();
-                if ( commsSession != null ) {
+                if (commsSession != null) {
                     commsSession.interrupt();
                 }
             }
-            
+
             activeRequests.add(flowFileRequest);
         } finally {
             requestLock.unlock();
         }
-        
+
         final ProcessSession session = sessionFactory.createSession();
-        
+
         try {
             onTrigger(context, session, flowFileRequest);
             // we leave the session open, because we send it back to the caller of #receiveFlowFile or #transmitFlowFile,
@@ -162,11 +163,11 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
         } catch (final TransmissionDisabledException e) {
             session.rollback();
         } catch (final Exception e) {
-            logger.error("{} Failed to process data due to {}", new Object[] {this, e});
-            if ( logger.isDebugEnabled() ) {
+            logger.error("{} Failed to process data due to {}", new Object[]{this, e});
+            if (logger.isDebugEnabled()) {
                 logger.error("", e);
             }
-            
+
             session.rollback();
         } finally {
             requestLock.lock();
@@ -177,50 +178,50 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
             }
         }
     }
-    
+
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
         // nothing to do here -- we will never get called because we override onTrigger(ProcessContext, ProcessSessionFactory)
     }
-    
+
     private void onTrigger(final ProcessContext context, final ProcessSession session, final FlowFileRequest flowFileRequest) {
         final ServerProtocol protocol = flowFileRequest.getProtocol();
         final BlockingQueue<ProcessingResult> responseQueue = flowFileRequest.getResponseQueue();
-        if ( flowFileRequest.isExpired() ) {
+        if (flowFileRequest.isExpired()) {
             final String message = String.format("%s Cannot service request from %s because the request has timed out", this, flowFileRequest.getPeer());
             logger.warn(message);
             eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
-            
+
             responseQueue.add(new ProcessingResult(new RequestExpiredException()));
             return;
         }
-        
+
         final Peer peer = flowFileRequest.getPeer();
         final CommunicationsSession commsSession = peer.getCommunicationsSession();
         final String sourceDn = commsSession.getUserDn();
         logger.debug("{} Servicing request for {} (DN={})", this, peer, sourceDn);
-        
+
         final PortAuthorizationResult authorizationResult = checkUserAuthorization(sourceDn);
-        if ( !authorizationResult.isAuthorized() ) {
-            final String message = String.format("%s Cannot service request from %s (DN=%s) because peer is not authorized to communicate with this port: %s", 
-                this, flowFileRequest.getPeer(), flowFileRequest.getPeer().getCommunicationsSession().getUserDn(), authorizationResult.getExplanation());
+        if (!authorizationResult.isAuthorized()) {
+            final String message = String.format("%s Cannot service request from %s (DN=%s) because peer is not authorized to communicate with this port: %s",
+                    this, flowFileRequest.getPeer(), flowFileRequest.getPeer().getCommunicationsSession().getUserDn(), authorizationResult.getExplanation());
             logger.error(message);
             eventReporter.reportEvent(Severity.ERROR, CATEGORY, message);
-            
+
             responseQueue.add(new ProcessingResult(new NotAuthorizedException(authorizationResult.getExplanation())));
             return;
         }
 
         final FlowFileCodec codec = protocol.getPreNegotiatedCodec();
-        if ( codec == null ) {
+        if (codec == null) {
             responseQueue.add(new ProcessingResult(new BadRequestException("None of the supported FlowFile Codecs supplied is compatible with this instance")));
             return;
         }
 
         final int transferCount;
-        
+
         try {
-            if ( getConnectableType() == ConnectableType.INPUT_PORT ) {
+            if (getConnectableType() == ConnectableType.INPUT_PORT) {
                 transferCount = receiveFlowFiles(context, session, codec, flowFileRequest);
             } else {
                 transferCount = transferFlowFiles(context, session, codec, flowFileRequest);
@@ -233,58 +234,55 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
         } catch (final Exception e) {
             session.rollback();
             responseQueue.add(new ProcessingResult(e));
-            
+
             return;
         }
-        
+
         session.commit();
         responseQueue.add(new ProcessingResult(transferCount));
     }
 
-    
     private int transferFlowFiles(final ProcessContext context, final ProcessSession session, final FlowFileCodec codec, final FlowFileRequest request) throws IOException, ProtocolException {
         return request.getProtocol().transferFlowFiles(request.getPeer(), context, session, codec);
     }
 
-    
     private int receiveFlowFiles(final ProcessContext context, final ProcessSession session, final FlowFileCodec codec, final FlowFileRequest receiveRequest) throws IOException, ProtocolException {
         return receiveRequest.getProtocol().receiveFlowFiles(receiveRequest.getPeer(), context, session, codec);
     }
-    
+
     @Override
     public boolean isValid() {
         return (getConnectableType() == ConnectableType.INPUT_PORT) ? !getConnections(Relationship.ANONYMOUS).isEmpty() : true;
     }
-    
+
     @Override
     public Collection<ValidationResult> getValidationErrors() {
         final Collection<ValidationResult> validationErrors = new ArrayList<>();
         if (!isValid()) {
             final ValidationResult error = new ValidationResult.Builder()
-                .explanation(String.format("Output connection for port '%s' is not defined.", getName()))
-                .subject(String.format("Port '%s'", getName()))
-                .valid(false)
-                .build();
+                    .explanation(String.format("Output connection for port '%s' is not defined.", getName()))
+                    .subject(String.format("Port '%s'", getName()))
+                    .valid(false)
+                    .build();
             validationErrors.add(error);
         }
         return validationErrors;
     }
-    
-    
+
     @Override
     public boolean isTransmitting() {
-        if ( !isRunning() ) {
+        if (!isRunning()) {
             return false;
         }
-        
-        if ( processScheduler.getActiveThreadCount(this) > 0 ) {
+
+        if (processScheduler.getActiveThreadCount(this) > 0) {
             return true;
         }
-                
-        if ( requestQueue.isEmpty() ) {
+
+        if (requestQueue.isEmpty()) {
             return false;
         }
-        
+
         requestLock.lock();
         try {
             return !activeRequests.isEmpty();
@@ -316,14 +314,14 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
     @Override
     public void shutdown() {
         super.shutdown();
-        
+
         requestLock.lock();
         try {
             this.shutdown = true;
-            
-            for ( final FlowFileRequest request : activeRequests ) {
+
+            for (final FlowFileRequest request : activeRequests) {
                 final CommunicationsSession commsSession = request.getPeer().getCommunicationsSession();
-                if ( commsSession != null ) {
+                if (commsSession != null) {
                     commsSession.interrupt();
                 }
             }
@@ -331,11 +329,11 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
             requestLock.unlock();
         }
     }
-    
+
     @Override
     public void onSchedulingStart() {
         super.onSchedulingStart();
-        
+
         requestLock.lock();
         try {
             shutdown = false;
@@ -343,14 +341,14 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
             requestLock.unlock();
         }
     }
-    
+
     @Override
     public PortAuthorizationResult checkUserAuthorization(final String dn) {
-        if ( !secure ) {
+        if (!secure) {
             return new StandardPortAuthorizationResult(true, "Site-to-Site is not Secure");
         }
 
-        if ( dn == null ) {
+        if (dn == null) {
             final String message = String.format("%s authorization failed for user %s because the DN is unknown", this, dn);
             logger.warn(message);
             eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
@@ -359,9 +357,9 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
 
         try {
             final NiFiUser user = userService.checkAuthorization(dn);
-            
+
             final Set<Authority> authorities = user.getAuthorities();
-            if ( !authorities.contains(Authority.ROLE_NIFI) ) {
+            if (!authorities.contains(Authority.ROLE_NIFI)) {
                 final String message = String.format("%s authorization failed for user %s because the user does not have Role NiFi", this, dn);
                 logger.warn(message);
                 eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
@@ -369,12 +367,12 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
             }
 
             final Set<String> allowedUsers = userAccessControl.get();
-            if ( allowedUsers.contains(dn) ) {
+            if (allowedUsers.contains(dn)) {
                 return new StandardPortAuthorizationResult(true, "User is Authorized");
             }
 
             final String userGroup = user.getUserGroup();
-            if ( userGroup == null ) {
+            if (userGroup == null) {
                 final String message = String.format("%s authorization failed for user %s because the user does not have a group and is not in the set of Allowed Users for this Port", this, dn);
                 logger.warn(message);
                 eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
@@ -383,13 +381,14 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
 
             final Set<String> allowedGroups = groupAccessControl.get();
             final boolean allowed = allowedGroups.contains(userGroup);
-            if ( !allowed ) {
-                final String message = String.format("%s authorization failed for user %s because the user is not in the set of Allowed Users, and the user's group is not in the set of Allowed Groups for this Port", this, dn);
+            if (!allowed) {
+                final String message = String.format("%s authorization failed for user %s because the user "
+                        + "is not in the set of Allowed Users, and the user's group is not in the set of Allowed Groups for this Port", this, dn);
                 logger.warn(message);
                 eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
                 return new StandardPortAuthorizationResult(false, "User is not Authorized to communicate with " + this.toString());
             }
-            
+
             return new StandardPortAuthorizationResult(true, "User is part of group '" + userGroup + "', which is Authorized to communicate with " + this.toString());
         } catch (final AccountNotFoundException anfe) {
             final String message = String.format("%s authorization failed for user %s because the DN is unknown", this, dn);
@@ -418,145 +417,145 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
             return new StandardPortAuthorizationResult(false, "Authorization failed because " + e);
         }
     }
-    
+
     public static class StandardPortAuthorizationResult implements PortAuthorizationResult {
+
         private final boolean isAuthorized;
         private final String explanation;
-        
+
         public StandardPortAuthorizationResult(final boolean isAuthorized, final String explanation) {
             this.isAuthorized = isAuthorized;
             this.explanation = explanation;
         }
-        
+
         @Override
         public boolean isAuthorized() {
             return isAuthorized;
         }
-        
+
         @Override
         public String getExplanation() {
             return explanation;
         }
     }
-    
-    
+
     private static class ProcessingResult {
+
         private final int fileCount;
         private final Exception problem;
-        
+
         public ProcessingResult(final int fileCount) {
             this.fileCount = fileCount;
             this.problem = null;
         }
-        
+
         public ProcessingResult(final Exception problem) {
             this.fileCount = 0;
             this.problem = problem;
         }
-        
+
         public Exception getProblem() {
             return problem;
         }
-        
+
         public int getFileCount() {
             return fileCount;
         }
     }
-    
 
-    
     private static class FlowFileRequest {
+
         private final Peer peer;
         private final ServerProtocol protocol;
         private final BlockingQueue<ProcessingResult> queue;
         private final long creationTime;
         private final AtomicBoolean beingServiced = new AtomicBoolean(false);
-        
+
         public FlowFileRequest(final Peer peer, final ServerProtocol protocol) {
             this.creationTime = System.currentTimeMillis();
             this.peer = peer;
             this.protocol = protocol;
             this.queue = new ArrayBlockingQueue<>(1);
         }
-        
-        
+
         public void setServiceBegin() {
             this.beingServiced.set(true);
         }
-        
+
         public boolean isBeingServiced() {
             return beingServiced.get();
         }
-        
+
         public BlockingQueue<ProcessingResult> getResponseQueue() {
             return queue;
         }
-        
+
         public Peer getPeer() {
             return peer;
         }
-        
+
         public ServerProtocol getProtocol() {
             return protocol;
         }
-        
+
         public boolean isExpired() {
             // use double the protocol's expiration because the sender may send data for a bit before
             // the timeout starts being counted, and we don't want to timeout before the sender does.
             // is this a good idea...???
             long expiration = protocol.getRequestExpiration() * 2;
-            if ( expiration <= 0L ) {
+            if (expiration <= 0L) {
                 return false;
             }
-            
+
             if (expiration < 500L) {
                 expiration = 500L;
             }
-  
+
             return System.currentTimeMillis() > creationTime + expiration;
         }
     }
 
-
     @Override
-    public int receiveFlowFiles(final Peer peer, final ServerProtocol serverProtocol, final Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException {
-        if ( getConnectableType() != ConnectableType.INPUT_PORT ) {
+    public int receiveFlowFiles(final Peer peer, final ServerProtocol serverProtocol, final Map<String, String> requestHeaders)
+            throws NotAuthorizedException, BadRequestException, RequestExpiredException {
+        if (getConnectableType() != ConnectableType.INPUT_PORT) {
             throw new IllegalStateException("Cannot receive FlowFiles because this port is not an Input Port");
         }
 
-        if ( !this.isRunning() ) {
+        if (!this.isRunning()) {
             throw new IllegalStateException("Port not running");
         }
-        
+
         try {
             final FlowFileRequest request = new FlowFileRequest(peer, serverProtocol);
-            if ( !this.requestQueue.offer(request) ) {
+            if (!this.requestQueue.offer(request)) {
                 throw new RequestExpiredException();
             }
-            
+
             // Trigger this port to run.
             scheduler.registerEvent(this);
-            
+
             // Get a response from the response queue but don't wait forever if the port is stopped
             ProcessingResult result = null;
-            
+
             // wait for the request to start getting serviced... and time out if it doesn't happen
             // before the request expires
-            while ( !request.isBeingServiced() ) {
-                if ( request.isExpired() ) {
+            while (!request.isBeingServiced()) {
+                if (request.isExpired()) {
                     throw new SocketTimeoutException("Read timed out");
                 } else {
                     try {
                         Thread.sleep(100L);
-                    } catch (final InterruptedException e) {}
+                    } catch (final InterruptedException e) {
+                    }
                 }
             }
 
             // we've started to service the request. Now just wait until it's finished
             result = request.getResponseQueue().take();
-            
+
             final Exception problem = result.getProblem();
-            if ( problem == null ) {
+            if (problem == null) {
                 return result.getFileCount();
             } else {
                 throw problem;
@@ -571,44 +570,46 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
     }
 
     @Override
-    public int transferFlowFiles(final Peer peer, final ServerProtocol serverProtocol, final Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException {
-        if ( getConnectableType() != ConnectableType.OUTPUT_PORT ) {
+    public int transferFlowFiles(final Peer peer, final ServerProtocol serverProtocol, final Map<String, String> requestHeaders)
+            throws NotAuthorizedException, BadRequestException, RequestExpiredException {
+        if (getConnectableType() != ConnectableType.OUTPUT_PORT) {
             throw new IllegalStateException("Cannot send FlowFiles because this port is not an Output Port");
         }
-        
-        if ( !this.isRunning() ) {
+
+        if (!this.isRunning()) {
             throw new IllegalStateException("Port not running");
         }
 
         try {
             final FlowFileRequest request = new FlowFileRequest(peer, serverProtocol);
-            if ( !this.requestQueue.offer(request) ) {
+            if (!this.requestQueue.offer(request)) {
                 throw new RequestExpiredException();
             }
 
             // Trigger this port to run
             scheduler.registerEvent(this);
-            
+
             // Get a response from the response queue but don't wait forever if the port is stopped
             ProcessingResult result = null;
-            
+
             // wait for the request to start getting serviced... and time out if it doesn't happen
             // before the request expires
-            while ( !request.isBeingServiced() ) {
-                if ( request.isExpired() ) {
+            while (!request.isBeingServiced()) {
+                if (request.isExpired()) {
                     throw new SocketTimeoutException("Read timed out");
-                } else { 
+                } else {
                     try {
                         Thread.sleep(100L);
-                    } catch (final InterruptedException e) {}
+                    } catch (final InterruptedException e) {
+                    }
                 }
             }
 
             // we've started to service the request. Now just wait until it's finished
             result = request.getResponseQueue().take();
-            
+
             final Exception problem = result.getProblem();
-            if ( problem == null ) {
+            if (problem == null) {
                 return result.getFileCount();
             } else {
                 throw problem;
@@ -621,12 +622,12 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
             throw new ProcessException(e);
         }
     }
-    
+
     @Override
     public SchedulingStrategy getSchedulingStrategy() {
         return SchedulingStrategy.TIMER_DRIVEN;
     }
-    
+
     @Override
     public boolean isSideEffectFree() {
         return false;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java
index 926809c..4a4f96b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java
@@ -19,9 +19,10 @@ package org.apache.nifi.remote.exception;
 import org.apache.nifi.remote.codec.FlowFileCodec;
 
 public class UnsupportedCodecException extends RuntimeException {
-	private static final long serialVersionUID = 198234789237L;
 
-	public UnsupportedCodecException(final String codecName) {
+    private static final long serialVersionUID = 198234789237L;
+
+    public UnsupportedCodecException(final String codecName) {
         super("Codec " + codecName + " is not supported");
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
index 391d52b..7d0ffab 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java
@@ -40,12 +40,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ClusterManagerServerProtocol implements ServerProtocol {
+
     public static final String RESOURCE_NAME = "SocketFlowFileProtocol";
 
     private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
     private final Logger logger = LoggerFactory.getLogger(ClusterManagerServerProtocol.class);
     private NodeInformant nodeInformant;
-    
+
     private String commsIdentifier;
     private boolean shutdown = false;
     private boolean handshakeCompleted = false;
@@ -53,52 +54,51 @@ public class ClusterManagerServerProtocol implements ServerProtocol {
 
     public ClusterManagerServerProtocol() {
     }
-    
-    
+
     @Override
     public void setNodeInformant(final NodeInformant nodeInformant) {
         this.nodeInformant = nodeInformant;
     }
-    
+
     @Override
     public void handshake(final Peer peer) throws IOException, HandshakeException {
-        if ( handshakeCompleted ) {
+        if (handshakeCompleted) {
             throw new IllegalStateException("Handshake has already been completed");
         }
-        if ( shutdown ) {
+        if (shutdown) {
             throw new IllegalStateException("Protocol is shutdown");
         }
 
         final CommunicationsSession commsSession = peer.getCommunicationsSession();
         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-        
+
         // read communications identifier
         commsIdentifier = dis.readUTF();
-        
+
         // read all of the properties. we don't really care what the properties are.
         final int numProperties = dis.readInt();
-        for (int i=0; i < numProperties; i++) {
+        for (int i = 0; i < numProperties; i++) {
             final String propertyName = dis.readUTF();
             final String propertyValue = dis.readUTF();
-            
+
             final HandshakeProperty property;
             try {
                 property = HandshakeProperty.valueOf(propertyName);
-                if ( HandshakeProperty.REQUEST_EXPIRATION_MILLIS.equals(property) ) {
+                if (HandshakeProperty.REQUEST_EXPIRATION_MILLIS.equals(property)) {
                     requestExpirationMillis = Long.parseLong(propertyValue);
                 }
             } catch (final Exception e) {
             }
         }
-        
+
         // send "OK" response
         ResponseCode.PROPERTIES_OK.writeResponse(dos);
-        
+
         logger.debug("Successfully completed handshake with {}; CommsID={}", peer, commsIdentifier);
         handshakeCompleted = true;
     }
-    
+
     @Override
     public boolean isHandshakeSuccessful() {
         return handshakeCompleted;
@@ -106,10 +106,10 @@ public class ClusterManagerServerProtocol implements ServerProtocol {
 
     @Override
     public void sendPeerList(final Peer peer) throws IOException {
-        if ( !handshakeCompleted ) {
+        if (!handshakeCompleted) {
             throw new IllegalStateException("Handshake has not been completed");
         }
-        if ( shutdown ) {
+        if (shutdown) {
             throw new IllegalStateException("Protocol is shutdown");
         }
 
@@ -118,29 +118,29 @@ public class ClusterManagerServerProtocol implements ServerProtocol {
 
         final ClusterNodeInformation clusterNodeInfo = nodeInformant.getNodeInformation();
         final Collection<NodeInformation> nodeInfos = clusterNodeInfo.getNodeInformation();
-        
+
         // determine how many nodes have Site-to-site enabled
         int numPeers = 0;
-        for ( final NodeInformation nodeInfo : nodeInfos ) {
+        for (final NodeInformation nodeInfo : nodeInfos) {
             if (nodeInfo.getSiteToSitePort() != null) {
                 numPeers++;
             }
         }
-        
+
         dos.writeInt(numPeers);
-        for ( final NodeInformation nodeInfo : nodeInfos ) {
-            if ( nodeInfo.getSiteToSitePort() == null ) {
+        for (final NodeInformation nodeInfo : nodeInfos) {
+            if (nodeInfo.getSiteToSitePort() == null) {
                 continue;
             }
-            
+
             dos.writeUTF(nodeInfo.getHostname());
             dos.writeInt(nodeInfo.getSiteToSitePort());
             dos.writeBoolean(nodeInfo.isSiteToSiteSecure());
             dos.writeInt(nodeInfo.getTotalFlowFiles());
         }
-        
+
         logger.info("Redirected {} to {} nodes", peer, numPeers);
-        
+
         dos.flush();
     }
 
@@ -153,7 +153,7 @@ public class ClusterManagerServerProtocol implements ServerProtocol {
     public boolean isShutdown() {
         return shutdown;
     }
-    
+
     @Override
     public FlowFileCodec negotiateCodec(Peer peer) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index 21de646..b931e26 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -65,43 +65,42 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SocketFlowFileServerProtocol implements ServerProtocol {
+
     public static final String RESOURCE_NAME = "SocketFlowFileProtocol";
-    
+
     private ProcessGroup rootGroup;
     private String commsIdentifier;
     private boolean handshakeCompleted;
-    
+
     private Boolean useGzip;
     private long requestExpirationMillis;
     private RootGroupPort port;
     private boolean shutdown = false;
     private FlowFileCodec negotiatedFlowFileCodec = null;
     private String transitUriPrefix = null;
-    
+
     private int requestedBatchCount = 0;
     private long requestedBatchBytes = 0L;
     private long requestedBatchNanos = 0L;
     private static final long DEFAULT_BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L);
-    
+
     private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1);
     private final Logger logger = LoggerFactory.getLogger(SocketFlowFileServerProtocol.class);
-    
 
-    
     @Override
     public void setRootProcessGroup(final ProcessGroup group) {
-        if ( !group.isRootGroup() ) {
+        if (!group.isRootGroup()) {
             throw new IllegalArgumentException();
         }
         this.rootGroup = group;
     }
-    
+
     @Override
     public void handshake(final Peer peer) throws IOException, HandshakeException {
-        if ( handshakeCompleted ) {
+        if (handshakeCompleted) {
             throw new IllegalStateException("Handshake has already been completed");
         }
-        if ( shutdown ) {
+        if (shutdown) {
             throw new IllegalStateException("Protocol is shutdown");
         }
 
@@ -109,30 +108,30 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
         final CommunicationsSession commsSession = peer.getCommunicationsSession();
         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-        
+
         commsIdentifier = dis.readUTF();
-        
-        if ( versionNegotiator.getVersion() >= 3 ) {
+
+        if (versionNegotiator.getVersion() >= 3) {
             transitUriPrefix = dis.readUTF();
-            if ( !transitUriPrefix.endsWith("/") ) {
+            if (!transitUriPrefix.endsWith("/")) {
                 transitUriPrefix = transitUriPrefix + "/";
             }
         }
-        
+
         final Map<String, String> properties = new HashMap<>();
         final int numProperties = dis.readInt();
-        for (int i=0; i < numProperties; i++) {
+        for (int i = 0; i < numProperties; i++) {
             final String propertyName = dis.readUTF();
             final String propertyValue = dis.readUTF();
             properties.put(propertyName, propertyValue);
         }
-        
+
         // evaluate the properties received
         boolean responseWritten = false;
-        for ( final Map.Entry<String, String> entry : properties.entrySet() ) {
+        for (final Map.Entry<String, String> entry : properties.entrySet()) {
             final String propertyName = entry.getKey();
             final String value = entry.getValue();
-            
+
             final HandshakeProperty property;
             try {
                 property = HandshakeProperty.valueOf(propertyName);
@@ -140,7 +139,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
                 ResponseCode.UNKNOWN_PROPERTY_NAME.writeResponse(dos, "Unknown Property Name: " + propertyName);
                 throw new HandshakeException("Received unknown property: " + propertyName);
             }
-            
+
             try {
                 switch (property) {
                     case GZIP: {
@@ -152,66 +151,66 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
                         break;
                     case BATCH_COUNT:
                         requestedBatchCount = Integer.parseInt(value);
-                        if ( requestedBatchCount < 0 ) {
+                        if (requestedBatchCount < 0) {
                             throw new HandshakeException("Cannot request Batch Count less than 1; requested value: " + value);
                         }
                         break;
                     case BATCH_SIZE:
                         requestedBatchBytes = Long.parseLong(value);
-                        if ( requestedBatchBytes < 0 ) {
+                        if (requestedBatchBytes < 0) {
                             throw new HandshakeException("Cannot request Batch Size less than 1; requested value: " + value);
                         }
                         break;
                     case BATCH_DURATION:
                         requestedBatchNanos = TimeUnit.MILLISECONDS.toNanos(Long.parseLong(value));
-                        if ( requestedBatchNanos < 0 ) {
+                        if (requestedBatchNanos < 0) {
                             throw new HandshakeException("Cannot request Batch Duration less than 1; requested value: " + value);
                         }
                         break;
                     case PORT_IDENTIFIER: {
                         Port receivedPort = rootGroup.getInputPort(value);
-                        if ( receivedPort == null ) {
+                        if (receivedPort == null) {
                             receivedPort = rootGroup.getOutputPort(value);
                         }
-                        if ( receivedPort == null ) {
+                        if (receivedPort == null) {
                             logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value);
                             ResponseCode.UNKNOWN_PORT.writeResponse(dos);
                             throw new HandshakeException("Received unknown port identifier: " + value);
                         }
-                        if ( !(receivedPort instanceof RootGroupPort) ) {
+                        if (!(receivedPort instanceof RootGroupPort)) {
                             logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value);
                             ResponseCode.UNKNOWN_PORT.writeResponse(dos);
                             throw new HandshakeException("Received port identifier " + value + ", but this Port is not a RootGroupPort");
                         }
-                        
+
                         this.port = (RootGroupPort) receivedPort;
                         final PortAuthorizationResult portAuthResult = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn());
-                        if ( !portAuthResult.isAuthorized() ) {
+                        if (!portAuthResult.isAuthorized()) {
                             logger.debug("Responding with ResponseCode UNAUTHORIZED: ", portAuthResult.getExplanation());
                             ResponseCode.UNAUTHORIZED.writeResponse(dos, portAuthResult.getExplanation());
                             responseWritten = true;
                             break;
                         }
-                        
-                        if ( !receivedPort.isValid() ) {
+
+                        if (!receivedPort.isValid()) {
                             logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort);
                             ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid");
                             responseWritten = true;
                             break;
                         }
-                        
-                        if ( !receivedPort.isRunning() ) {
+
+                        if (!receivedPort.isRunning()) {
                             logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort);
                             ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running");
                             responseWritten = true;
                             break;
                         }
-                        
+
                         // PORTS_DESTINATION_FULL was introduced in version 2. If version 1, just ignore this
                         // we we will simply not service the request but the sender will timeout
-                        if ( getVersionNegotiator().getVersion() > 1 ) {
-                            for ( final Connection connection : port.getConnections() ) {
-                                if ( connection.getFlowFileQueue().isFull() ) {
+                        if (getVersionNegotiator().getVersion() > 1) {
+                            for (final Connection connection : port.getConnections()) {
+                                if (connection.getFlowFileQueue().isFull()) {
                                     logger.debug("Responding with ResponseCode PORTS_DESTINATION_FULL for {}", receivedPort);
                                     ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos);
                                     responseWritten = true;
@@ -219,7 +218,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
                                 }
                             }
                         }
-                        
+
                         break;
                     }
                 }
@@ -227,54 +226,54 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
                 throw new HandshakeException("Received invalid value for property '" + property + "'; invalid value: " + value);
             }
         }
-        
-        if ( useGzip == null ) {
+
+        if (useGzip == null) {
             logger.debug("Responding with ResponseCode MISSING_PROPERTY because GZIP Property missing");
             ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.GZIP.name());
             throw new HandshakeException("Missing Property " + HandshakeProperty.GZIP.name());
         }
-        
+
         // send "OK" response
-        if ( !responseWritten ) {
+        if (!responseWritten) {
             ResponseCode.PROPERTIES_OK.writeResponse(dos);
         }
-        
+
         logger.debug("{} Finished handshake with {}", this, peer);
         handshakeCompleted = true;
     }
-    
+
     @Override
     public boolean isHandshakeSuccessful() {
         return handshakeCompleted;
     }
-    
+
     @Override
     public RootGroupPort getPort() {
         return port;
     }
-    
+
     @Override
     public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException {
-        if ( !handshakeCompleted ) {
+        if (!handshakeCompleted) {
             throw new IllegalStateException("Handshake has not been completed");
         }
-        if ( shutdown ) {
+        if (shutdown) {
             throw new IllegalStateException("Protocol is shutdown");
         }
 
-        logger.debug("{} Negotiating Codec with {} using {}", new Object[] {this, peer, peer.getCommunicationsSession()});
+        logger.debug("{} Negotiating Codec with {} using {}", new Object[]{this, peer, peer.getCommunicationsSession()});
         final CommunicationsSession commsSession = peer.getCommunicationsSession();
         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
-        
-        if ( port == null ) {
-        	RemoteResourceFactory.rejectCodecNegotiation(dis, dos, "Cannot transfer FlowFiles because no port was specified");
+
+        if (port == null) {
+            RemoteResourceFactory.rejectCodecNegotiation(dis, dos, "Cannot transfer FlowFiles because no port was specified");
         }
-        
+
         // Negotiate the FlowFileCodec to use.
         try {
             negotiatedFlowFileCodec = RemoteResourceFactory.receiveCodecNegotiation(dis, dos);
-            logger.debug("{} Negotiated Codec {} with {}", new Object[] {this, negotiatedFlowFileCodec, peer});
+            logger.debug("{} Negotiated Codec {} with {}", new Object[]{this, negotiatedFlowFileCodec, peer});
             return negotiatedFlowFileCodec;
         } catch (final HandshakeException e) {
             throw new ProtocolException(e.toString());
@@ -286,13 +285,12 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
         return negotiatedFlowFileCodec;
     }
 
-    
     @Override
     public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
-        if ( !handshakeCompleted ) {
+        if (!handshakeCompleted) {
             throw new IllegalStateException("Handshake has not been completed");
         }
-        if ( shutdown ) {
+        if (shutdown) {
             throw new IllegalStateException("Protocol is shutdown");
         }
 
@@ -301,22 +299,22 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
         String remoteDn = commsSession.getUserDn();
-        if ( remoteDn == null ) {
+        if (remoteDn == null) {
             remoteDn = "none";
         }
 
         FlowFile flowFile = session.get();
-        if ( flowFile == null ) {
+        if (flowFile == null) {
             // we have no data to send. Notify the peer.
             logger.debug("{} No data to send to {}", this, peer);
             ResponseCode.NO_MORE_DATA.writeResponse(dos);
             return 0;
         }
-        
+
         // we have data to send.
         logger.debug("{} Data is available to send to {}", this, peer);
         ResponseCode.MORE_DATA.writeResponse(dos);
-        
+
         final StopWatch stopWatch = new StopWatch(true);
         long bytesSent = 0L;
         final Set<FlowFile> flowFilesSent = new HashSet<>();
@@ -328,27 +326,27 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
         String calculatedCRC = "";
         while (continueTransaction) {
             final OutputStream flowFileOutputStream = useGzip ? new CompressionOutputStream(dos) : dos;
-            logger.debug("{} Sending {} to {}", new Object[] {this, flowFile, peer});
-            
+            logger.debug("{} Sending {} to {}", new Object[]{this, flowFile, peer});
+
             final CheckedOutputStream checkedOutputStream = new CheckedOutputStream(flowFileOutputStream, crc);
 
             final StopWatch transferWatch = new StopWatch(true);
-            
+
             final FlowFile toSend = flowFile;
             session.read(flowFile, new InputStreamCallback() {
-				@Override
-				public void process(final InputStream in) throws IOException {
-					final DataPacket dataPacket = new StandardDataPacket(toSend.getAttributes(), in, toSend.getSize());
-					codec.encode(dataPacket, checkedOutputStream);
-				}
+                @Override
+                public void process(final InputStream in) throws IOException {
+                    final DataPacket dataPacket = new StandardDataPacket(toSend.getAttributes(), in, toSend.getSize());
+                    codec.encode(dataPacket, checkedOutputStream);
+                }
             });
-            
+
             final long transmissionMillis = transferWatch.getElapsed(TimeUnit.MILLISECONDS);
-            
+
             // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
             // Otherwise, do NOT close it because we don't want to close the underlying stream
             // (CompressionOutputStream will not close the underlying stream when it's closed)
-            if ( useGzip ) {
+            if (useGzip) {
                 checkedOutputStream.close();
             }
 
@@ -358,33 +356,33 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
             final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
             session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false);
             session.remove(flowFile);
-            
+
             // determine if we should check for more data on queue.
             final long sendingNanos = System.nanoTime() - startNanos;
             boolean poll = true;
-            if ( sendingNanos >= requestedBatchNanos && requestedBatchNanos > 0L ) {
+            if (sendingNanos >= requestedBatchNanos && requestedBatchNanos > 0L) {
                 poll = false;
             }
-            if ( bytesSent >= requestedBatchBytes && requestedBatchBytes > 0L ) {
+            if (bytesSent >= requestedBatchBytes && requestedBatchBytes > 0L) {
                 poll = false;
             }
-            if ( flowFilesSent.size() >= requestedBatchCount && requestedBatchCount > 0 ) {
+            if (flowFilesSent.size() >= requestedBatchCount && requestedBatchCount > 0) {
                 poll = false;
             }
-            
-            if ( requestedBatchNanos == 0 && requestedBatchBytes == 0 && requestedBatchCount == 0 ) {
+
+            if (requestedBatchNanos == 0 && requestedBatchBytes == 0 && requestedBatchCount == 0) {
                 poll = (sendingNanos < DEFAULT_BATCH_NANOS);
             }
-            
-            if ( poll ) { 
+
+            if (poll) {
                 // we've not elapsed the requested sending duration, so get more data.
                 flowFile = session.get();
             } else {
                 flowFile = null;
             }
-            
+
             continueTransaction = (flowFile != null);
-            if ( continueTransaction ) {
+            if (continueTransaction) {
                 logger.debug("{} Sending ContinueTransaction indicator to {}", this, peer);
                 ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
             } else {
@@ -393,19 +391,21 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
                 calculatedCRC = String.valueOf(checkedOutputStream.getChecksum().getValue());
             }
         }
-        
+
         // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
         final Response transactionConfirmationResponse = Response.read(dis);
-        if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
+        if (transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION) {
             // Confirm Checksum and echo back the confirmation.
             logger.debug("{} Received {}  from {}", this, transactionConfirmationResponse, peer);
             final String receivedCRC = transactionConfirmationResponse.getMessage();
 
-            if ( versionNegotiator.getVersion() > 3 ) {
-                if ( !receivedCRC.equals(calculatedCRC) ) {
+            if (versionNegotiator.getVersion() > 3) {
+                if (!receivedCRC.equals(calculatedCRC)) {
                     ResponseCode.BAD_CHECKSUM.writeResponse(dos);
                     session.rollback();
-                    throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
+                    throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as "
+                            + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC
+                            + "; canceling transaction and rolling back session");
                 }
             }
 
@@ -415,61 +415,60 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
         }
 
         final String flowFileDescription = flowFilesSent.size() < 20 ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
-        
+
         final Response transactionResponse;
         try {
             transactionResponse = Response.read(dis);
         } catch (final IOException e) {
-            logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator." +
-                " It is unknown whether or not the peer successfully received/processed the data." +
-                " Therefore, {} will be rolled back, possibly resulting in data duplication of {}", 
-                this, peer, session, flowFileDescription);
+            logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator."
+                    + " It is unknown whether or not the peer successfully received/processed the data."
+                    + " Therefore, {} will be rolled back, possibly resulting in data duplication of {}",
+                    this, peer, session, flowFileDescription);
             session.rollback();
             throw e;
         }
-        
-        logger.debug("{} received {} from {}", new Object[] {this, transactionResponse, peer});
-        if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
+
+        logger.debug("{} received {} from {}", new Object[]{this, transactionResponse, peer});
+        if (transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
             peer.penalize(port.getIdentifier(), port.getYieldPeriod(TimeUnit.MILLISECONDS));
-        } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
+        } else if (transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED) {
             throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
         }
-        
+
         session.commit();
-        
+
         stopWatch.stop();
         final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
         final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
         final String dataSize = FormatUtils.formatDataSize(bytesSent);
-        logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
+        logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{
             this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
 
         return flowFilesSent.size();
     }
-    
-    
+
     @Override
     public int receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
-        if ( !handshakeCompleted ) {
+        if (!handshakeCompleted) {
             throw new IllegalStateException("Handshake has not been completed");
         }
-        if ( shutdown ) {
+        if (shutdown) {
             throw new IllegalStateException("Protocol is shutdown");
         }
 
         logger.debug("{} receiving FlowFiles from {}", this, peer);
-        
+
         final CommunicationsSession commsSession = peer.getCommunicationsSession();
         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
         String remoteDn = commsSession.getUserDn();
-        if ( remoteDn == null ) {
+        if (remoteDn == null) {
             remoteDn = "none";
         }
 
         final StopWatch stopWatch = new StopWatch(true);
         final CRC32 crc = new CRC32();
-        
+
         // Peer has data. Otherwise, we would not have been called, because they would not have sent
         // a SEND_FLOWFILES request to use. Just decode the bytes into FlowFiles until peer says he's
         // finished sending data.
@@ -486,18 +485,19 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
             FlowFile flowFile = session.create();
             flowFile = session.importFrom(dataPacket.getData(), flowFile);
             flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes());
-            
+
             final long transferNanos = System.nanoTime() - startNanos;
             final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
             final String sourceSystemFlowFileUuid = dataPacket.getAttributes().get(CoreAttributes.UUID.key());
             flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString());
-            
+
             final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceSystemFlowFileUuid;
-            session.getProvenanceReporter().receive(flowFile, transitUri, sourceSystemFlowFileUuid == null ? null : "urn:nifi:" + sourceSystemFlowFileUuid, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transferMillis);
+            session.getProvenanceReporter().receive(flowFile, transitUri, sourceSystemFlowFileUuid == null
+                    ? null : "urn:nifi:" + sourceSystemFlowFileUuid, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transferMillis);
             session.transfer(flowFile, Relationship.ANONYMOUS);
             flowFilesReceived.add(flowFile);
             bytesReceived += flowFile.getSize();
-            
+
             final Response transactionResponse = Response.read(dis);
             switch (transactionResponse.getCode()) {
                 case CONTINUE_TRANSACTION:
@@ -516,7 +516,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
                     throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionResponse);
             }
         }
-        
+
         // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
         // to peer so that we can verify that the connection is still open. This is a two-phase commit,
         // which helps to prevent the chances of data duplication. Without doing this, we may commit the
@@ -526,7 +526,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
         // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
         logger.debug("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
         ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
-        
+
         final Response confirmTransactionResponse = Response.read(dis);
         logger.debug("{} Received {} from {}", this, confirmTransactionResponse, peer);
 
@@ -539,11 +539,11 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
             default:
                 throw new ProtocolException(this + " Received unexpected Response Code from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
         }
-        
+
         // Commit the session so that we have persisted the data
         session.commit();
-        
-        if ( context.getAvailableRelationships().isEmpty() ) {
+
+        if (context.getAvailableRelationships().isEmpty()) {
             // Confirm that we received the data and the peer can now discard it but that the peer should not
             // send any more data for a bit
             logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
@@ -553,30 +553,30 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
             logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
             ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
         }
-        
+
         stopWatch.stop();
         final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
         final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
         final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
         final String dataSize = FormatUtils.formatDataSize(bytesReceived);
-        logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] {
+        logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{
             this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
 
         return flowFilesReceived.size();
     }
-    
+
     @Override
     public RequestType getRequestType(final Peer peer) throws IOException {
-        if ( !handshakeCompleted ) {
+        if (!handshakeCompleted) {
             throw new IllegalStateException("Handshake has not been completed");
         }
-        if ( shutdown ) {
+        if (shutdown) {
             throw new IllegalStateException("Protocol is shutdown");
         }
 
-        logger.debug("{} Reading Request Type from {} using {}", new Object[] {this, peer, peer.getCommunicationsSession()});
+        logger.debug("{} Reading Request Type from {} using {}", new Object[]{this, peer, peer.getCommunicationsSession()});
         final RequestType requestType = RequestType.readRequestType(new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream()));
-        logger.debug("{} Got Request Type {} from {}", new Object[] {this, requestType, peer});
+        logger.debug("{} Got Request Type {} from {}", new Object[]{this, requestType, peer});
 
         return requestType;
     }
@@ -599,10 +599,10 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
 
     @Override
     public void sendPeerList(final Peer peer) throws IOException {
-        if ( !handshakeCompleted ) {
+        if (!handshakeCompleted) {
             throw new IllegalStateException("Handshake has not been completed");
         }
-        if ( shutdown ) {
+        if (shutdown) {
             throw new IllegalStateException("Protocol is shutdown");
         }
 
@@ -611,7 +611,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
 
         final NiFiProperties properties = NiFiProperties.getInstance();
-        
+
         // we have only 1 peer: ourselves.
         dos.writeInt(1);
         dos.writeUTF(InetAddress.getLocalHost().getHostName());
@@ -620,12 +620,12 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
         dos.writeInt(0);    // doesn't matter how many FlowFiles we have, because we're the only host.
         dos.flush();
     }
-    
+
     @Override
     public String getResourceName() {
         return RESOURCE_NAME;
     }
-    
+
     @Override
     public void setNodeInformant(final NodeInformant nodeInformant) {
     }
@@ -634,7 +634,7 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
     public long getRequestExpiration() {
         return requestExpirationMillis;
     }
-    
+
     @Override
     public String toString() {
         return "SocketFlowFileServerProtocol[CommsID=" + commsIdentifier + "]";

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardSiteToSiteProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardSiteToSiteProtocol.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardSiteToSiteProtocol.java
index b9a567b..8380f8b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardSiteToSiteProtocol.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardSiteToSiteProtocol.java
@@ -39,13 +39,13 @@ package org.apache.nifi.remote;
 //        final Map<NodeInformation, Destination> destinationMap = new LinkedHashMap<>();
 //        final NodeInformation node1 = new NodeInformation("hostA", 80, 90, true, 3);
 //        final NodeInformation node2 = new NodeInformation("hostB", 80, 90, true, 500);
-//        
+//
 //        final Destination node1Destination = new Destination(createRemoteGroupPort("PortA"), null, node1, TransferDirection.SEND, true, null);
 //        final Destination node2Destination = new Destination(createRemoteGroupPort("PortB"), null, node2, TransferDirection.SEND, true, null);
-//        
+//
 //        destinationMap.put(node1, node1Destination);
 //        destinationMap.put(node2, node2Destination);
-//        
+//
 //        final List<Destination> destinations = StandardSiteToSiteProtocol.formulateDestinationList(destinationMap, TransferDirection.SEND);
 //        int node1Count = 0, node2Count = 0;
 //        for ( final Destination destination : destinations ) {
@@ -57,30 +57,30 @@ package org.apache.nifi.remote;
 //                Assert.fail("Got Destination for unknkown NodeInformation");
 //            }
 //        }
-//        
+//
 //        System.out.println(node1Count);
 //        System.out.println(node2Count);
-//        
+//
 //        final double node1Pct = (double) node1Count / (double) (node1Count + node2Count);
 //        assertEquals(0.80, node1Pct, 0.01);
-//        // node1  should get the most but is not allowed to have more than approximately 80% of the data. 
+//        // node1  should get the most but is not allowed to have more than approximately 80% of the data.
 //    }
-//    
+//
 //    @Test
 //    public void testWeightedDistributionWithThreeNodes() throws IOException {
 //        final Map<NodeInformation, Destination> destinationMap = new LinkedHashMap<>();
 //        final NodeInformation node1 = new NodeInformation("hostA", 80, 90, true, 3);
 //        final NodeInformation node2 = new NodeInformation("hostB", 80, 90, true, 500);
 //        final NodeInformation node3 = new NodeInformation("hostC", 80, 90, true, 500);
-//        
+//
 //        final Destination node1Destination = new Destination(createRemoteGroupPort("PortA"), null, node1, TransferDirection.SEND, true, null);
 //        final Destination node2Destination = new Destination(createRemoteGroupPort("PortB"), null, node2, TransferDirection.SEND, true, null);
 //        final Destination node3Destination = new Destination(createRemoteGroupPort("PortC"), null, node3, TransferDirection.SEND, true, null);
-//        
+//
 //        destinationMap.put(node1, node1Destination);
 //        destinationMap.put(node2, node2Destination);
 //        destinationMap.put(node3, node3Destination);
-//        
+//
 //        final List<Destination> destinations = StandardSiteToSiteProtocol.formulateDestinationList(destinationMap, TransferDirection.SEND);
 //        int node1Count = 0, node2Count = 0, node3Count = 0;
 //        for ( final Destination destination : destinations ) {
@@ -94,20 +94,20 @@ package org.apache.nifi.remote;
 //                Assert.fail("Got Destination for unknkown NodeInformation");
 //            }
 //        }
-//        
+//
 //        System.out.println(node1Count);
 //        System.out.println(node2Count);
 //        System.out.println(node3Count);
-//        
+//
 //        final double node1Pct = (double) node1Count / (double) (node1Count + node2Count + node3Count);
 //        final double node2Pct = (double) node2Count / (double) (node1Count + node2Count + node3Count);
 //        final double node3Pct = (double) node3Count / (double) (node1Count + node2Count + node3Count);
-//        
+//
 //        assertEquals(0.5, node1Pct, 0.02);
 //        assertEquals(0.25, node2Pct, 0.02);
 //        assertEquals(node2Pct, node3Pct, 0.02);
 //    }
-//    
+//
 //    private RemoteGroupPort createRemoteGroupPort(final String portName) {
 //        RemoteGroupPort port = Mockito.mock(RemoteGroupPort.class);
 //        Mockito.when(port.getName()).thenReturn(portName);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9faaef8c/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java
index 4e55f5f..03f8190 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/io/socket/TestSocketChannelStreams.java
@@ -65,7 +65,7 @@ package org.apache.nifi.remote.io.socket;
 //        System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
 //        final SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 5000));
 //        channel.configureBlocking(false);
-//        
+//
 //        final CommunicationsSession commsSession;
 //        commsSession = new SocketChannelCommunicationsSession(channel, "", null);
 //        commsSession.setUri("nifi://localhost:5000");
@@ -74,7 +74,7 @@ package org.apache.nifi.remote.io.socket;
 //
 //        dos.write(CommunicationsProtocol.MAGIC_BYTES);
 //        dos.flush();
-//        
+//
 //        final EventReporter eventReporter = Mockito.mock(EventReporter.class);
 //        final StandardSiteToSiteProtocol proposedProtocol = new StandardSiteToSiteProtocol(commsSession, eventReporter, NiFiProperties.getInstance());
 //
@@ -84,20 +84,20 @@ package org.apache.nifi.remote.io.socket;
 //        final RemoteProcessGroup rpg = Mockito.mock(RemoteProcessGroup.class);
 //        Mockito.when(rpg.getCommunicationsTimeout(Mockito.any(TimeUnit.class))).thenReturn(2000);
 //        Mockito.when(rpg.getTargetUri()).thenReturn( new URI("https://localhost:5050/") );
-//        
+//
 //        final RemoteGroupPort port = Mockito.mock(RemoteGroupPort.class);
 //        Mockito.when(port.getIdentifier()).thenReturn("90880680-d6da-40be-b2cc-a15423de2e1a");
 //        Mockito.when(port.getName()).thenReturn("Data In");
 //        Mockito.when(port.getRemoteProcessGroup()).thenReturn(rpg);
-//        
+//
 //        negotiatedProtocol.initiateHandshake(port, TransferDirection.SEND);
 //    }
-//    
+//
 //    @Test
 //    public void testInputOutputStreams() throws IOException, InterruptedException {
 //        final ServerThread server = new ServerThread();
 //        server.start();
-//        
+//
 //        int port = server.getPort();
 //        while ( port <= 0 ) {
 //            Thread.sleep(10L);
@@ -106,11 +106,11 @@ package org.apache.nifi.remote.io.socket;
 //
 //        final SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", port));
 //        channel.configureBlocking(false);
-//        
+//
 //        final OutputStream out = new SocketChannelOutputStream(channel);
 //        final InputStream in = new SocketChannelInputStream(channel);
 //        final DataInputStream dataIn = new DataInputStream(in);
-//        
+//
 //        final byte[] sent = new byte[DATA_SIZE];
 //        for (int i=0; i < sent.length; i++) {
 //            sent[i] = (byte) (i % 255);
@@ -125,21 +125,21 @@ package org.apache.nifi.remote.io.socket;
 //            final float megabytes = (float) DATA_SIZE / (1024F * 1024F);
 //            final float MBperS = megabytes / seconds;
 //            System.out.println("Millis: " + millis + "; MB/s: " + MBperS);
-//            
+//
 //            Thread.sleep(2500L);
 //            final byte[] received = server.getReceivedData();
 //            System.out.println("Server received " + received.length + " bytes");
 //            server.clearReceivedData();
 //            assertTrue(Arrays.equals(sent, received));
-//            
+//
 //            final long val = dataIn.readLong();
 //            assertEquals(DATA_SIZE, val);
 //            System.out.println(val);
 //        }
-//        
+//
 //        server.shutdown();
 //    }
-//    
+//
 //    public final long toLong(final byte[] buffer) throws IOException {
 //        return (((long)buffer[0] << 56) +
 //                ((long)(buffer[1] & 255) << 48) +
@@ -150,82 +150,82 @@ package org.apache.nifi.remote.io.socket;
 //                ((buffer[6] & 255) <<  8) +
 //                ((buffer[7] & 255) <<  0));
 //    }
-//    
+//
 //    private static class ServerThread extends Thread {
 //        private int listeningPort;
 //        private final ByteArrayOutputStream received = new ByteArrayOutputStream();
-//        
+//
 //        private volatile int readingDelay = 0;
 //        private volatile boolean shutdown = false;
-//        
+//
 //        public ServerThread() {
 //        }
-//        
+//
 //        public int getPort() {
 //            return listeningPort;
 //        }
-//        
+//
 //        public byte[] getReceivedData() {
 //            return received.toByteArray();
 //        }
-//        
+//
 //        @Override
 //        public void run() {
 //            try {
 //                final ServerSocketFactory serverSocketFactory = ServerSocketFactory.getDefault();
 //                final ServerSocket serverSocket = serverSocketFactory.createServerSocket(0);
 //                this.listeningPort = serverSocket.getLocalPort();
-//                
+//
 //                final Socket socket = serverSocket.accept();
 //                final InputStream stream = socket.getInputStream();
 //                final DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
-//                
+//
 //                final byte[] buffer = new byte[4096];
 //                int len;
-//                
+//
 //                while (!shutdown) {
 //                    try {
 //                        len = stream.read(buffer);
-//                        
+//
 //                        System.out.println("Received " + len + " bytes");
-//                        
+//
 //                        if ( readingDelay > 0 ) {
 //                            try { Thread.sleep(readingDelay); } catch (final InterruptedException e) {}
 //                        }
 //                    } catch (final SocketTimeoutException e) {
 //                        continue;
 //                    }
-//                    
+//
 //                    if ( len < 0 ) {
 //                        return;
 //                    }
-//                    
+//
 //                    received.write(buffer, 0, len);
-//                    
+//
 //                    final long length = received.size();
 //                    if ( length % (DATA_SIZE) == 0 ) {
 //                        dos.writeLong(length);
 //                        dos.flush();
 //                    }
 //                }
-//                
+//
 //                System.out.println("Server successfully shutdown");
 //            } catch (final Exception e) {
 //                e.printStackTrace();
 //            }
 //        }
-//        
+//
 //        public void clearReceivedData() {
 //            this.received.reset();
 //        }
-//        
+//
 //        public void shutdown() {
 //            this.shutdown = true;
 //        }
-//        
+//
 //        public void delayReading(final int millis) {
 //            this.readingDelay = millis;
 //        }
 //    }
-//    
+//
 //}