You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/16 17:57:50 UTC

[5/8] incubator-nifi git commit: Merge branch 'develop' of https://git-wip-us.apache.org/repos/asf/incubator-nifi into develop

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
index 0000000,8c60e4b..acb3a01
mode 000000,100644..100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
@@@ -1,0 -1,173 +1,194 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.controller.scheduling;
+ 
++import java.util.Collection;
++import java.util.Collections;
+ import java.util.HashMap;
++import java.util.HashSet;
+ import java.util.Map;
++import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
+ import org.apache.nifi.connectable.Connectable;
++import org.apache.nifi.connectable.Connection;
+ import org.apache.nifi.controller.ControllerService;
+ import org.apache.nifi.controller.ControllerServiceLookup;
+ import org.apache.nifi.encrypt.StringEncryptor;
+ import org.apache.nifi.expression.AttributeValueDecorator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
++import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.processor.exception.ProcessException;
+ 
+ /**
+  * This class is essentially an empty shell for {@link Connectable}s that are
+  * not Processors
+  */
+ public class ConnectableProcessContext implements ProcessContext {
+ 
+     private final Connectable connectable;
+     private final StringEncryptor encryptor;
+ 
+     public ConnectableProcessContext(final Connectable connectable, final StringEncryptor encryptor) {
+         this.connectable = connectable;
+         this.encryptor = encryptor;
+     }
+ 
+     @Override
+     public PropertyValue getProperty(final PropertyDescriptor descriptor) {
+         return getProperty(descriptor.getName());
+     }
+ 
+     @Override
+     public PropertyValue getProperty(final String propertyName) {
+         return new PropertyValue() {
+             @Override
+             public String getValue() {
+                 return null;
+             }
+ 
+             @Override
+             public Integer asInteger() {
+                 return null;
+             }
+ 
+             @Override
+             public Long asLong() {
+                 return null;
+             }
+ 
+             @Override
+             public Boolean asBoolean() {
+                 return null;
+             }
+ 
+             @Override
+             public Float asFloat() {
+                 return null;
+             }
+ 
+             @Override
+             public Double asDouble() {
+                 return null;
+             }
+ 
+             @Override
+             public Long asTimePeriod(final TimeUnit timeUnit) {
+                 return null;
+             }
+ 
+             @Override
+             public Double asDataSize(final DataUnit dataUnit) {
+                 return null;
+             }
+ 
+             @Override
+             public PropertyValue evaluateAttributeExpressions() throws ProcessException {
+                 return this;
+             }
+ 
+             @Override
+             public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile) throws ProcessException {
+                 return this;
+             }
+ 
+             @Override
+             public PropertyValue evaluateAttributeExpressions(final AttributeValueDecorator decorator) throws ProcessException {
+                 return this;
+             }
+ 
+             @Override
+             public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException {
+                 return this;
+             }
+ 
+             @Override
+             public ControllerService asControllerService() {
+                 return null;
+             }
+ 
+             @Override
+             public <T extends ControllerService> T asControllerService(Class<T> serviceType) throws IllegalArgumentException {
+                 return null;
+             }
+ 
+             @Override
+             public boolean isSet() {
+                 return false;
+             }
+         };
+     }
+ 
+     @Override
+     public PropertyValue newPropertyValue(String rawValue) {
+         throw new UnsupportedOperationException();
+     }
+ 
+     @Override
+     public void yield() {
+         connectable.yield();
+     }
+ 
+     @Override
+     public int getMaxConcurrentTasks() {
+         return connectable.getMaxConcurrentTasks();
+     }
+ 
+     @Override
+     public String getAnnotationData() {
+         return null;
+     }
+ 
+     @Override
+     public Map<PropertyDescriptor, String> getProperties() {
+         return new HashMap<>();
+     }
+ 
+     @Override
+     public String decrypt(String encrypted) {
+         return encryptor.decrypt(encrypted);
+     }
+ 
+     @Override
+     public String encrypt(String unencrypted) {
+         return encryptor.encrypt(unencrypted);
+     }
+ 
+     @Override
+     public ControllerServiceLookup getControllerServiceLookup() {
+         return null;
+     }
++
++    @Override
++    public Set<Relationship> getAvailableRelationships() {
++        for ( final Connection connection : connectable.getConnections() ) {
++            if ( connection.getFlowFileQueue().isFull() ) {
++                return Collections.emptySet();
++            }
++        }
++        
++        final Collection<Relationship> relationships = connectable.getRelationships();
++        if ( relationships instanceof Set ) {
++            return (Set<Relationship>) relationships;
++        }
++        return new HashSet<>(connectable.getRelationships());
++    }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
index 0000000,93a8c6b..cd0d31c
mode 000000,100644..100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
@@@ -1,0 -1,145 +1,173 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processor;
+ 
++import java.util.Collection;
+ import java.util.HashMap;
++import java.util.HashSet;
+ import java.util.Map;
+ import java.util.Set;
+ 
+ import org.apache.nifi.attribute.expression.language.PreparedQuery;
+ import org.apache.nifi.attribute.expression.language.Query;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
++import org.apache.nifi.connectable.Connection;
+ import org.apache.nifi.controller.ControllerService;
+ import org.apache.nifi.controller.ControllerServiceLookup;
+ import org.apache.nifi.controller.ProcessorNode;
+ import org.apache.nifi.controller.service.ControllerServiceProvider;
+ import org.apache.nifi.encrypt.StringEncryptor;
+ 
+ public class StandardProcessContext implements ProcessContext, ControllerServiceLookup {
+ 
+     private final ProcessorNode procNode;
+     private final ControllerServiceProvider controllerServiceProvider;
+     private final Map<PropertyDescriptor, PreparedQuery> preparedQueries;
+     private final StringEncryptor encryptor;
+ 
+     public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor) {
+         this.procNode = processorNode;
+         this.controllerServiceProvider = controllerServiceProvider;
+         this.encryptor = encryptor;
+ 
+         preparedQueries = new HashMap<>();
+         for (final Map.Entry<PropertyDescriptor, String> entry : procNode.getProperties().entrySet()) {
+             final PropertyDescriptor desc = entry.getKey();
+             String value = entry.getValue();
+             if (value == null) {
+                 value = desc.getDefaultValue();
+             }
+ 
+             final PreparedQuery pq = Query.prepare(value);
+             preparedQueries.put(desc, pq);
+         }
+     }
+ 
+     @Override
+     public PropertyValue getProperty(final PropertyDescriptor descriptor) {
+         return getProperty(descriptor.getName());
+     }
+ 
+     /**
+      * <p>
+      * Returns the currently configured value for the property with the given
+      * name.
+      * </p>
+      */
+     @Override
+     public PropertyValue getProperty(final String propertyName) {
+         final Processor processor = procNode.getProcessor();
+         final PropertyDescriptor descriptor = processor.getPropertyDescriptor(propertyName);
+         if (descriptor == null) {
+             return null;
+         }
+ 
+         final String setPropertyValue = procNode.getProperty(descriptor);
+         final String propValue = (setPropertyValue == null) ? descriptor.getDefaultValue() : setPropertyValue;
+ 
+         return new StandardPropertyValue(propValue, this, preparedQueries.get(descriptor));
+     }
+ 
+     @Override
+     public PropertyValue newPropertyValue(final String rawValue) {
+         return new StandardPropertyValue(rawValue, this, Query.prepare(rawValue));
+     }
+ 
+     @Override
+     public void yield() {
+         procNode.yield();
+     }
+ 
+     @Override
+     public ControllerService getControllerService(final String serviceIdentifier) {
+         return controllerServiceProvider.getControllerService(serviceIdentifier);
+     }
+ 
+     @Override
+     public int getMaxConcurrentTasks() {
+         return procNode.getMaxConcurrentTasks();
+     }
+ 
+     @Override
+     public String getAnnotationData() {
+         return procNode.getAnnotationData();
+     }
+ 
+     @Override
+     public Map<PropertyDescriptor, String> getProperties() {
+         return procNode.getProperties();
+     }
+ 
+     @Override
+     public String encrypt(final String unencrypted) {
+         return encryptor.encrypt(unencrypted);
+     }
+ 
+     @Override
+     public String decrypt(final String encrypted) {
+         return encryptor.decrypt(encrypted);
+     }
+ 
+     @Override
+     public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
+         if (!serviceType.isInterface()) {
+             throw new IllegalArgumentException("ControllerServices may be referenced only via their interfaces; " + serviceType + " is not an interface");
+         }
+         return controllerServiceProvider.getControllerServiceIdentifiers(serviceType);
+     }
+ 
+     @Override
+     public boolean isControllerServiceEnabled(final ControllerService service) {
+         return controllerServiceProvider.isControllerServiceEnabled(service);
+     }
+ 
+     @Override
+     public boolean isControllerServiceEnabled(final String serviceIdentifier) {
+         return controllerServiceProvider.isControllerServiceEnabled(serviceIdentifier);
+     }
+ 
+     @Override
+     public ControllerServiceLookup getControllerServiceLookup() {
+         return this;
+     }
++
++    @Override
++    public Set<Relationship> getAvailableRelationships() {
++        final Set<Relationship> set = new HashSet<>();
++        for (final Relationship relationship : procNode.getRelationships()) {
++            final Collection<Connection> connections = procNode.getConnections(relationship);
++            if (connections.isEmpty()) {
++                set.add(relationship);
++            } else {
++                boolean available = true;
++                for (final Connection connection : connections) {
++                    if (connection.getFlowFileQueue().isFull()) {
++                        available = false;
++                    }
++                }
++
++                if (available) {
++                    set.add(relationship);
++                }
++            }
++        }
++
++        return set;
++    }
++    
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
index 0000000,0fe08c9..318901f
mode 000000,100644..100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
@@@ -1,0 -1,107 +1,113 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.processor;
+ 
+ import java.util.Map;
++import java.util.Set;
+ 
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
+ import org.apache.nifi.controller.ControllerServiceLookup;
+ import org.apache.nifi.controller.ProcessorNode;
+ import org.apache.nifi.controller.service.ControllerServiceNode;
+ import org.apache.nifi.controller.service.ControllerServiceProvider;
+ 
+ public class StandardSchedulingContext implements SchedulingContext {
+ 
+     private final ProcessContext processContext;
+     private final ControllerServiceProvider serviceProvider;
+     private final ProcessorNode processorNode;
+ 
+     public StandardSchedulingContext(final ProcessContext processContext, final ControllerServiceProvider serviceProvider, final ProcessorNode processorNode) {
+         this.processContext = processContext;
+         this.serviceProvider = serviceProvider;
+         this.processorNode = processorNode;
+     }
+ 
+     @Override
+     public void leaseControllerService(final String identifier) {
+         final ControllerServiceNode serviceNode = serviceProvider.getControllerServiceNode(identifier);
+         if (serviceNode == null) {
+             throw new IllegalArgumentException("Cannot lease Controller Service because no Controller Service exists with identifier " + identifier);
+         }
+ 
+         if (serviceNode.isDisabled()) {
+             throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getControllerService() + " is currently disabled");
+         }
+ 
+         if (!serviceNode.isValid()) {
+             throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getControllerService() + " is not currently valid");
+         }
+ 
+         serviceNode.addReference(processorNode);
+     }
+ 
+     @Override
+     public PropertyValue getProperty(final PropertyDescriptor descriptor) {
+         return processContext.getProperty(descriptor);
+     }
+ 
+     @Override
+     public PropertyValue getProperty(final String propertyName) {
+         return processContext.getProperty(propertyName);
+     }
+ 
+     @Override
+     public PropertyValue newPropertyValue(final String rawValue) {
+         return processContext.newPropertyValue(rawValue);
+     }
+ 
+     @Override
+     public void yield() {
+         processContext.yield();
+     }
+ 
+     @Override
+     public int getMaxConcurrentTasks() {
+         return processContext.getMaxConcurrentTasks();
+     }
+ 
+     @Override
+     public String getAnnotationData() {
+         return processContext.getAnnotationData();
+     }
+ 
+     @Override
+     public Map<PropertyDescriptor, String> getProperties() {
+         return processContext.getProperties();
+     }
+ 
+     @Override
+     public String encrypt(final String unencrypted) {
+         return processContext.encrypt(unencrypted);
+     }
+ 
+     @Override
+     public String decrypt(final String encrypted) {
+         return processContext.decrypt(encrypted);
+     }
+ 
+     @Override
+     public ControllerServiceLookup getControllerServiceLookup() {
+         return processContext.getControllerServiceLookup();
+     }
++
++    @Override
++    public Set<Relationship> getAvailableRelationships() {
++        return processContext.getAvailableRelationships();
++    }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 0000000,22ec983..d4b4f61
mode 000000,100644..100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@@ -1,0 -1,510 +1,510 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.remote.protocol.socket;
+ 
+ import java.io.DataInputStream;
+ import java.io.DataOutputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+ import java.util.concurrent.TimeUnit;
+ import java.util.zip.CRC32;
+ import java.util.zip.CheckedInputStream;
+ import java.util.zip.CheckedOutputStream;
+ 
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.remote.Peer;
+ import org.apache.nifi.remote.PeerStatus;
+ import org.apache.nifi.remote.RemoteGroupPort;
+ import org.apache.nifi.remote.RemoteResourceFactory;
+ import org.apache.nifi.remote.StandardVersionNegotiator;
+ import org.apache.nifi.remote.VersionNegotiator;
+ import org.apache.nifi.remote.codec.FlowFileCodec;
+ import org.apache.nifi.remote.codec.StandardFlowFileCodec;
+ import org.apache.nifi.remote.exception.HandshakeException;
+ import org.apache.nifi.remote.exception.ProtocolException;
+ import org.apache.nifi.remote.io.CompressionInputStream;
+ import org.apache.nifi.remote.io.CompressionOutputStream;
+ import org.apache.nifi.remote.protocol.ClientProtocol;
+ import org.apache.nifi.remote.protocol.CommunicationsSession;
+ import org.apache.nifi.remote.protocol.RequestType;
+ import org.apache.nifi.util.FormatUtils;
+ import org.apache.nifi.util.StopWatch;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ public class SocketClientProtocol implements ClientProtocol {
+     private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1);
+ 
+     
+     private RemoteGroupPort port;
+     private boolean useCompression;
+     
+     private String commsIdentifier;
+     private boolean handshakeComplete = false;
+     
+     private final Logger logger = LoggerFactory.getLogger(SocketClientProtocol.class);
+     
+     private Response handshakeResponse = null;
+     private boolean readyForFileTransfer = false;
+     private String transitUriPrefix = null;
+     
+     private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
+     
+     public SocketClientProtocol() {
+     }
+ 
+     public void setPort(final RemoteGroupPort port) {
+         this.port = port;
+         this.useCompression = port.isUseCompression();
+     }
+     
+     @Override
+     public void handshake(final Peer peer) throws IOException, HandshakeException {
+         if ( handshakeComplete ) {
+             throw new IllegalStateException("Handshake has already been completed");
+         }
+         commsIdentifier = UUID.randomUUID().toString();
+         logger.debug("{} handshaking with {}", this, peer);
+         
+         final Map<HandshakeProperty, String> properties = new HashMap<>();
+         properties.put(HandshakeProperty.GZIP, String.valueOf(useCompression));
+         properties.put(HandshakeProperty.PORT_IDENTIFIER, port.getIdentifier());
+         properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(
+             port.getRemoteProcessGroup().getCommunicationsTimeout(TimeUnit.MILLISECONDS)) );
+         
+         final CommunicationsSession commsSession = peer.getCommunicationsSession();
+         commsSession.setTimeout(port.getRemoteProcessGroup().getCommunicationsTimeout(TimeUnit.MILLISECONDS));
+         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+         
+         dos.writeUTF(commsIdentifier);
+         
+         if ( versionNegotiator.getVersion() >= 3 ) {
+             dos.writeUTF(peer.getUrl());
+             transitUriPrefix = peer.getUrl();
+             
+             if ( !transitUriPrefix.endsWith("/") ) {
+                 transitUriPrefix = transitUriPrefix + "/";
+             }
+         }
+         
+         dos.writeInt(properties.size());
+         for ( final Map.Entry<HandshakeProperty, String> entry : properties.entrySet() ) {
+             dos.writeUTF(entry.getKey().name());
+             dos.writeUTF(entry.getValue());
+         }
+         
+         dos.flush();
+         
+         try {
+             handshakeResponse = Response.read(dis);
+         } catch (final ProtocolException e) {
+             throw new HandshakeException(e);
+         }
+         
+         switch (handshakeResponse.getCode()) {
+             case PORT_NOT_IN_VALID_STATE:
+             case UNKNOWN_PORT:
+             case PORTS_DESTINATION_FULL:
+                 break;
+             case PROPERTIES_OK:
+                 readyForFileTransfer = true;
+                 break;
+             default:
+                 logger.error("{} received unexpected response {} from {} when negotiating Codec", new Object[] {
+                     this, handshakeResponse, peer});
+                 peer.close();
+                 throw new HandshakeException("Received unexpected response " + handshakeResponse);
+         }
+         
+         logger.debug("{} Finished handshake with {}", this, peer);
+         handshakeComplete = true;
+     }
+     
+     public boolean isReadyForFileTransfer() {
+         return readyForFileTransfer;
+     }
+     
+     public boolean isPortInvalid() {
+         if ( !handshakeComplete ) {
+             throw new IllegalStateException("Handshake has not completed successfully");
+         }
+         return handshakeResponse.getCode() == ResponseCode.PORT_NOT_IN_VALID_STATE;
+     }
+     
+     public boolean isPortUnknown() {
+         if ( !handshakeComplete ) {
+             throw new IllegalStateException("Handshake has not completed successfully");
+         }
+         return handshakeResponse.getCode() == ResponseCode.UNKNOWN_PORT;
+     }
+     
+     public boolean isDestinationFull() {
+         if ( !handshakeComplete ) {
+             throw new IllegalStateException("Handshake has not completed successfully");
+         }
+         return handshakeResponse.getCode() == ResponseCode.PORTS_DESTINATION_FULL;
+     }
+     
+     @Override
+     public Set<PeerStatus> getPeerStatuses(final Peer peer) throws IOException {
+         if ( !handshakeComplete ) {
+             throw new IllegalStateException("Handshake has not been performed");
+         }
+         
+         logger.debug("{} Get Peer Statuses from {}", this, peer);
+         final CommunicationsSession commsSession = peer.getCommunicationsSession();
+         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+         
+         RequestType.REQUEST_PEER_LIST.writeRequestType(dos);
+         dos.flush();
+         final int numPeers = dis.readInt();
+         final Set<PeerStatus> peers = new HashSet<>(numPeers);
+         for (int i=0; i < numPeers; i++) {
+             final String hostname = dis.readUTF();
+             final int port = dis.readInt();
+             final boolean secure = dis.readBoolean();
+             final int flowFileCount = dis.readInt();
+             peers.add(new PeerStatus(hostname, port, secure, flowFileCount));
+         }
+         
+         logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer);
+         return peers;
+     }
+     
+     @Override
+     public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException {
+         if ( !handshakeComplete ) {
+             throw new IllegalStateException("Handshake has not been performed");
+         }
+ 
+         logger.debug("{} Negotiating Codec with {}", this, peer);
+         final CommunicationsSession commsSession = peer.getCommunicationsSession();
+         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+ 
+         RequestType.NEGOTIATE_FLOWFILE_CODEC.writeRequestType(dos);
+         
+         FlowFileCodec codec = new StandardFlowFileCodec();
+         try {
+             codec = (FlowFileCodec) RemoteResourceFactory.initiateResourceNegotiation(codec, dis, dos);
+         } catch (HandshakeException e) {
+             throw new ProtocolException(e.toString());
+         }
+         logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[] {this, codec, commsSession});
+ 
+         return codec;
+     }
+ 
+     
+     @Override
+     public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+         if ( !handshakeComplete ) {
+             throw new IllegalStateException("Handshake has not been performed");
+         }
+         if ( !readyForFileTransfer ) {
+             throw new IllegalStateException("Cannot receive files; handshake resolution was " + handshakeResponse);
+         }
+ 
+         logger.debug("{} Receiving FlowFiles from {}", this, peer);
+         final CommunicationsSession commsSession = peer.getCommunicationsSession();
+         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+         String userDn = commsSession.getUserDn();
+         if ( userDn == null ) {
+             userDn = "none";
+         }
+         
+         // Indicate that we would like to have some data
+         RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
+         dos.flush();
+         
+         // Determine if Peer will send us data or has no data to send us
+         final Response dataAvailableCode = Response.read(dis);
+         switch (dataAvailableCode.getCode()) {
+             case MORE_DATA:
+                 logger.debug("{} {} Indicates that data is available", this, peer);
+                 break;
+             case NO_MORE_DATA:
+                 logger.debug("{} No data available from {}", peer);
+                 return;
+             default:
+                 throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+         }
+ 
+         final StopWatch stopWatch = new StopWatch(true);
+         final Set<FlowFile> flowFilesReceived = new HashSet<>();
+         long bytesReceived = 0L;
+         final CRC32 crc = new CRC32();
+         
+         // Peer has data. Decode the bytes into FlowFiles until peer says he's finished sending data.
+         boolean continueTransaction = true;
+         String calculatedCRC = "";
+         while (continueTransaction) {
+             final InputStream flowFileInputStream = useCompression ? new CompressionInputStream(dis) : dis;
+             final CheckedInputStream checkedIn = new CheckedInputStream(flowFileInputStream, crc);
+             
+             final long startNanos = System.nanoTime();
+             FlowFile flowFile = codec.decode(checkedIn, session);
+             final long transmissionNanos = System.nanoTime() - startNanos;
+             final long transmissionMillis = TimeUnit.MILLISECONDS.convert(transmissionNanos, TimeUnit.NANOSECONDS);
+             
+             final String sourceFlowFileIdentifier = flowFile.getAttribute(CoreAttributes.UUID.key());
+             flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+             
+             final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
+             session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transmissionMillis);
+             
+             session.transfer(flowFile, Relationship.ANONYMOUS);
+             bytesReceived += flowFile.getSize();
+             flowFilesReceived.add(flowFile);
+             logger.debug("{} Received {} from {}", this, flowFile, peer);
+             
+             final Response transactionCode = Response.read(dis);
+             switch (transactionCode.getCode()) {
+                 case CONTINUE_TRANSACTION:
+                     logger.trace("{} Received ContinueTransaction indicator from {}", this, peer);
+                     break;
+                 case FINISH_TRANSACTION:
+                     logger.trace("{} Received FinishTransaction indicator from {}", this, peer);
+                     continueTransaction = false;
+                     calculatedCRC = String.valueOf(checkedIn.getChecksum().getValue());
+                     break;
+                 default:
+                     throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionCode);
+             }
+         }
+         
+         // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
+         // to peer so that we can verify that the connection is still open. This is a two-phase commit,
+         // which helps to prevent the chances of data duplication. Without doing this, we may commit the
+         // session and then when we send the response back to the peer, the peer may have timed out and may not
+         // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
+         // Critical Section involved in this transaction so that rather than the Critical Section being the
+         // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
+         logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
+         ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
+         
+         final Response confirmTransactionResponse = Response.read(dis);
+         logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer);
+         
+         switch (confirmTransactionResponse.getCode()) {
+             case CONFIRM_TRANSACTION:
+                 break;
+             case BAD_CHECKSUM:
+                 session.rollback();
+                 throw new IOException(this + " Received a BadChecksum response from peer " + peer);
+             default:
+                 throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
+         }
+         
+         // Commit the session so that we have persisted the data
+         session.commit();
+         
 -        if ( session.getAvailableRelationships().isEmpty() ) {
++        if ( context.getAvailableRelationships().isEmpty() ) {
+             // Confirm that we received the data and the peer can now discard it but that the peer should not
+             // send any more data for a bit
+             logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
+             ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
+         } else {
+             // Confirm that we received the data and the peer can now discard it
+             logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
+             ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
+         }
+         
+         stopWatch.stop();
+         final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
+         final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
+         final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+         final String dataSize = FormatUtils.formatDataSize(bytesReceived);
+         logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] {
+             this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+     }
+ 
+     @Override
+     public void transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+         if ( !handshakeComplete ) {
+             throw new IllegalStateException("Handshake has not been performed");
+         }
+         if ( !readyForFileTransfer ) {
+             throw new IllegalStateException("Cannot transfer files; handshake resolution was " + handshakeResponse);
+         }
+ 
+         FlowFile flowFile = session.get();
+         if ( flowFile == null ) {
+             return;
+         }
+ 
+         logger.debug("{} Sending FlowFiles to {}", this, peer);
+         final CommunicationsSession commsSession = peer.getCommunicationsSession();
+         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+         String userDn = commsSession.getUserDn();
+         if ( userDn == null ) {
+             userDn = "none";
+         }
+         
+         // Indicate that we would like to have some data
+         RequestType.SEND_FLOWFILES.writeRequestType(dos);
+         dos.flush();
+         
+         final StopWatch stopWatch = new StopWatch(true);
+         final CRC32 crc = new CRC32();
+         
+         long bytesSent = 0L;
+         final Set<FlowFile> flowFilesSent = new HashSet<>();
+         boolean continueTransaction = true;
+         String calculatedCRC = "";
+         final long startSendingNanos = System.nanoTime();
+         while (continueTransaction) {
+             final OutputStream flowFileOutputStream = useCompression ? new CompressionOutputStream(dos) : dos;
+             logger.debug("{} Sending {} to {}", this, flowFile, peer);
+             
+             final CheckedOutputStream checkedOutStream = new CheckedOutputStream(flowFileOutputStream, crc);
+             
+             final long startNanos = System.nanoTime();
+             flowFile = codec.encode(flowFile, session, checkedOutStream);
+             final long transferNanos = System.nanoTime() - startNanos;
+             final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+             
+             // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
+             // Otherwise, do NOT close it because we don't want to close the underlying stream
+             // (CompressionOutputStream will not close the underlying stream when it's closed)
+             if ( useCompression ) {
+                 checkedOutStream.close();
+             }
+             
+             flowFilesSent.add(flowFile);
+             bytesSent += flowFile.getSize();
+             logger.debug("{} Sent {} to {}", this, flowFile, peer);
+             
+             final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
+             session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false);
+             session.remove(flowFile);
+             
+             final long sendingNanos = System.nanoTime() - startSendingNanos;
+             if ( sendingNanos < BATCH_SEND_NANOS ) { 
+                 flowFile = session.get();
+             } else {
+                 flowFile = null;
+             }
+             
+             continueTransaction = (flowFile != null);
+             if ( continueTransaction ) {
+                 logger.debug("{} Sent CONTINUE_TRANSACTION indicator to {}", this, peer);
+                 ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
+             } else {
+                 logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer);
+                 ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
+                 
+                 calculatedCRC = String.valueOf( checkedOutStream.getChecksum().getValue() );
+             }
+         }
+         
+         // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
+         final Response transactionConfirmationResponse = Response.read(dis);
+         if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
+             // Confirm checksum and echo back the confirmation.
+             logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer);
+             final String receivedCRC = transactionConfirmationResponse.getMessage();
+             
+             if ( versionNegotiator.getVersion() > 3 ) {
+                 if ( !receivedCRC.equals(calculatedCRC) ) {
+                     ResponseCode.BAD_CHECKSUM.writeResponse(dos);
+                     session.rollback();
+                     throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
+                 }
+             }
+             
+             ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
+         } else {
+             throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
+         }
+ 
+         final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+ 
+         final Response transactionResponse;
+         try {
+             transactionResponse = Response.read(dis);
+         } catch (final IOException e) {
+             logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator." +
+                     " It is unknown whether or not the peer successfully received/processed the data." +
+                     " Therefore, {} will be rolled back, possibly resulting in data duplication of {}", 
+                     this, peer, session, flowFileDescription);
+             session.rollback();
+             throw e;
+         }
+         
+         logger.debug("{} Received {} from {}", this, transactionResponse, peer);
+         if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
+             peer.penalize(port.getYieldPeriod(TimeUnit.MILLISECONDS));
+         } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
+             throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
+         }
+         
+         // consume input stream entirely, ignoring its contents. If we
+         // don't do this, the Connection will not be returned to the pool
+         stopWatch.stop();
+         final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
+         final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+         final String dataSize = FormatUtils.formatDataSize(bytesSent);
+         
+         session.commit();
+         
+         logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
+             this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+     }
+ 
+     @Override
+     public VersionNegotiator getVersionNegotiator() {
+         return versionNegotiator;
+     }
+     
+     @Override
+     public void shutdown(final Peer peer) throws IOException {
+         readyForFileTransfer = false;
+         final CommunicationsSession commsSession = peer.getCommunicationsSession();
+         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+         
+         logger.debug("{} Shutting down with {}", this, peer);
+         // Indicate that we would like to have some data
+         RequestType.SHUTDOWN.writeRequestType(dos);
+         dos.flush();
+     }
+ 
+     @Override
+     public String getResourceName() {
+         return "SocketFlowFileProtocol";
+     }
+     
+     @Override
+     public String toString() {
+         return "SocketClientProtocol[CommsID=" + commsIdentifier + "]";
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index 0000000,88b6a41..5edd4f9
mode 000000,100644..100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@@ -1,0 -1,581 +1,581 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.nifi.remote.protocol.socket;
+ 
+ import java.io.DataInputStream;
+ import java.io.DataOutputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.net.InetAddress;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+ import java.util.concurrent.TimeUnit;
+ import java.util.zip.CRC32;
+ import java.util.zip.CheckedInputStream;
+ import java.util.zip.CheckedOutputStream;
+ 
+ import org.apache.nifi.cluster.NodeInformant;
+ import org.apache.nifi.connectable.Connection;
+ import org.apache.nifi.connectable.Port;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.groups.ProcessGroup;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.remote.Peer;
+ import org.apache.nifi.remote.PortAuthorizationResult;
+ import org.apache.nifi.remote.RemoteResourceFactory;
+ import org.apache.nifi.remote.RootGroupPort;
+ import org.apache.nifi.remote.StandardVersionNegotiator;
+ import org.apache.nifi.remote.VersionNegotiator;
+ import org.apache.nifi.remote.codec.FlowFileCodec;
+ import org.apache.nifi.remote.exception.HandshakeException;
+ import org.apache.nifi.remote.exception.ProtocolException;
+ import org.apache.nifi.remote.io.CompressionInputStream;
+ import org.apache.nifi.remote.io.CompressionOutputStream;
+ import org.apache.nifi.remote.protocol.CommunicationsSession;
+ import org.apache.nifi.remote.protocol.RequestType;
+ import org.apache.nifi.remote.protocol.ServerProtocol;
+ import org.apache.nifi.util.FormatUtils;
+ import org.apache.nifi.util.NiFiProperties;
+ import org.apache.nifi.util.StopWatch;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ public class SocketFlowFileServerProtocol implements ServerProtocol {
+     public static final String RESOURCE_NAME = "SocketFlowFileProtocol";
+     
+     private ProcessGroup rootGroup;
+     private String commsIdentifier;
+     private boolean handshakeCompleted;
+     
+     private Boolean useGzip;
+     private long requestExpirationMillis;
+     private RootGroupPort port;
+     private boolean shutdown = false;
+     private FlowFileCodec negotiatedFlowFileCodec = null;
+     private String transitUriPrefix = null;
+     
+     private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1);
+     private final Logger logger = LoggerFactory.getLogger(SocketFlowFileServerProtocol.class);
+     
+     private static final long BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
+ 
+     
+     @Override
+     public void setRootProcessGroup(final ProcessGroup group) {
+         if ( !group.isRootGroup() ) {
+             throw new IllegalArgumentException();
+         }
+         this.rootGroup = group;
+     }
+     
+     @Override
+     public void handshake(final Peer peer) throws IOException, HandshakeException {
+         if ( handshakeCompleted ) {
+             throw new IllegalStateException("Handshake has already been completed");
+         }
+         if ( shutdown ) {
+             throw new IllegalStateException("Protocol is shutdown");
+         }
+ 
+         logger.debug("{} Handshaking with {}", this, peer);
+         final CommunicationsSession commsSession = peer.getCommunicationsSession();
+         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+         
+         commsIdentifier = dis.readUTF();
+         
+         if ( versionNegotiator.getVersion() >= 3 ) {
+             transitUriPrefix = dis.readUTF();
+             if ( !transitUriPrefix.endsWith("/") ) {
+                 transitUriPrefix = transitUriPrefix + "/";
+             }
+         }
+         
+         final Map<String, String> properties = new HashMap<>();
+         final int numProperties = dis.readInt();
+         for (int i=0; i < numProperties; i++) {
+             final String propertyName = dis.readUTF();
+             final String propertyValue = dis.readUTF();
+             properties.put(propertyName, propertyValue);
+         }
+         
+         // evaluate the properties received
+         boolean responseWritten = false;
+         for ( final Map.Entry<String, String> entry : properties.entrySet() ) {
+             final String propertyName = entry.getKey();
+             final String value = entry.getValue();
+             
+             final HandshakeProperty property;
+             try {
+                 property = HandshakeProperty.valueOf(propertyName);
+             } catch (final Exception e) {
+                 ResponseCode.UNKNOWN_PROPERTY_NAME.writeResponse(dos, "Unknown Property Name: " + propertyName);
+                 throw new HandshakeException("Received unknown property: " + propertyName);
+             }
+             
+             switch (property) {
+                 case GZIP: {
+                     useGzip = Boolean.parseBoolean(value);
+                     break;
+                 }
+                 case REQUEST_EXPIRATION_MILLIS:
+                     requestExpirationMillis = Long.parseLong(value);
+                     break;
+                 case PORT_IDENTIFIER: {
+                     Port receivedPort = rootGroup.getInputPort(value);
+                     if ( receivedPort == null ) {
+                         receivedPort = rootGroup.getOutputPort(value);
+                     }
+                     if ( receivedPort == null ) {
+                         logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value);
+                         ResponseCode.UNKNOWN_PORT.writeResponse(dos);
+                         throw new HandshakeException("Received unknown port identifier: " + value);
+                     }
+                     if ( !(receivedPort instanceof RootGroupPort) ) {
+                         logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value);
+                         ResponseCode.UNKNOWN_PORT.writeResponse(dos);
+                         throw new HandshakeException("Received port identifier " + value + ", but this Port is not a RootGroupPort");
+                     }
+                     
+                     this.port = (RootGroupPort) receivedPort;
+                     final PortAuthorizationResult portAuthResult = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn());
+                     if ( !portAuthResult.isAuthorized() ) {
+                         logger.debug("Responding with ResponseCode UNAUTHORIZED: ", portAuthResult.getExplanation());
+                         ResponseCode.UNAUTHORIZED.writeResponse(dos, portAuthResult.getExplanation());
+                         responseWritten = true;
+                         break;
+                     }
+                     
+                     if ( !receivedPort.isValid() ) {
+                         logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort);
+                         ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid");
+                         responseWritten = true;
+                         break;
+                     }
+                     
+                     if ( !receivedPort.isRunning() ) {
+                         logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort);
+                         ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running");
+                         responseWritten = true;
+                         break;
+                     }
+                     
+                     // PORTS_DESTINATION_FULL was introduced in version 2. If version 1, just ignore this
+                     // we we will simply not service the request but the sender will timeout
+                     if ( getVersionNegotiator().getVersion() > 1 ) {
+                         for ( final Connection connection : port.getConnections() ) {
+                             if ( connection.getFlowFileQueue().isFull() ) {
+                                 logger.debug("Responding with ResponseCode PORTS_DESTINATION_FULL for {}", receivedPort);
+                                 ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos);
+                                 responseWritten = true;
+                                 break;
+                             }
+                         }
+                     }
+                     
+                     break;
+                 }
+             }
+         }
+         
+         if ( useGzip == null ) {
+             logger.debug("Responding with ResponseCode MISSING_PROPERTY because GZIP Property missing");
+             ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.GZIP.name());
+             throw new HandshakeException("Missing Property " + HandshakeProperty.GZIP.name());
+         }
+         if ( port == null ) {
+             logger.debug("Responding with ResponseCode MISSING_PROPERTY because Port Identifier property is missing");
+             ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.PORT_IDENTIFIER.name());
+             throw new HandshakeException("Missing Property " + HandshakeProperty.PORT_IDENTIFIER.name());
+         }
+         
+         // send "OK" response
+         if ( !responseWritten ) {
+             ResponseCode.PROPERTIES_OK.writeResponse(dos);
+         }
+         
+         logger.debug("{} Finished handshake with {}", this, peer);
+         handshakeCompleted = true;
+     }
+     
+     @Override
+     public boolean isHandshakeSuccessful() {
+         return handshakeCompleted;
+     }
+     
+     @Override
+     public RootGroupPort getPort() {
+         return port;
+     }
+     
+     @Override
+     public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException {
+         if ( !handshakeCompleted ) {
+             throw new IllegalStateException("Handshake has not been completed");
+         }
+         if ( shutdown ) {
+             throw new IllegalStateException("Protocol is shutdown");
+         }
+ 
+         logger.debug("{} Negotiating Codec with {} using {}", new Object[] {this, peer, peer.getCommunicationsSession()});
+         final CommunicationsSession commsSession = peer.getCommunicationsSession();
+         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+         
+         // Negotiate the FlowFileCodec to use.
+         try {
+             negotiatedFlowFileCodec = RemoteResourceFactory.receiveCodecNegotiation(dis, dos);
+             logger.debug("{} Negotiated Codec {} with {}", new Object[] {this, negotiatedFlowFileCodec, peer});
+             return negotiatedFlowFileCodec;
+         } catch (final HandshakeException e) {
+             throw new ProtocolException(e.toString());
+         }
+     }
+ 
+     @Override
+     public FlowFileCodec getPreNegotiatedCodec() {
+         return negotiatedFlowFileCodec;
+     }
+ 
+     
+     @Override
+     public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+         if ( !handshakeCompleted ) {
+             throw new IllegalStateException("Handshake has not been completed");
+         }
+         if ( shutdown ) {
+             throw new IllegalStateException("Protocol is shutdown");
+         }
+ 
+         logger.debug("{} Sending FlowFiles to {}", this, peer);
+         final CommunicationsSession commsSession = peer.getCommunicationsSession();
+         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+         String remoteDn = commsSession.getUserDn();
+         if ( remoteDn == null ) {
+             remoteDn = "none";
+         }
+ 
+         FlowFile flowFile = session.get();
+         if ( flowFile == null ) {
+             // we have no data to send. Notify the peer.
+             logger.debug("{} No data to send to {}", this, peer);
+             ResponseCode.NO_MORE_DATA.writeResponse(dos);
+             return 0;
+         }
+         
+         // we have data to send.
+         logger.debug("{} Data is available to send to {}", this, peer);
+         ResponseCode.MORE_DATA.writeResponse(dos);
+         
+         final StopWatch stopWatch = new StopWatch(true);
+         long bytesSent = 0L;
+         final Set<FlowFile> flowFilesSent = new HashSet<>();
+         final CRC32 crc = new CRC32();
+ 
+         // send data until we reach some batch size
+         boolean continueTransaction = true;
+         final long startNanos = System.nanoTime();
+         String calculatedCRC = "";
+         while (continueTransaction) {
+             final OutputStream flowFileOutputStream = useGzip ? new CompressionOutputStream(dos) : dos;
+             logger.debug("{} Sending {} to {}", new Object[] {this, flowFile, peer});
+             
+             final CheckedOutputStream checkedOutputStream = new CheckedOutputStream(flowFileOutputStream, crc);
+ 
+             final StopWatch transferWatch = new StopWatch(true);
+             flowFile = codec.encode(flowFile, session, checkedOutputStream);
+             final long transmissionMillis = transferWatch.getElapsed(TimeUnit.MILLISECONDS);
+             
+             // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
+             // Otherwise, do NOT close it because we don't want to close the underlying stream
+             // (CompressionOutputStream will not close the underlying stream when it's closed)
+             if ( useGzip ) {
+                 checkedOutputStream.close();
+             }
+ 
+             flowFilesSent.add(flowFile);
+             bytesSent += flowFile.getSize();
+ 
+             final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
+             session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false);
+             session.remove(flowFile);
+             
+             final long sendingNanos = System.nanoTime() - startNanos;
+             if ( sendingNanos < BATCH_NANOS ) { 
+                 flowFile = session.get();
+             } else {
+                 flowFile = null;
+             }
+             
+             continueTransaction = (flowFile != null);
+             if ( continueTransaction ) {
+                 logger.debug("{} Sending ContinueTransaction indicator to {}", this, peer);
+                 ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
+             } else {
+                 logger.debug("{} Sending FinishTransaction indicator to {}", this, peer);
+                 ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
+                 calculatedCRC = String.valueOf(checkedOutputStream.getChecksum().getValue());
+             }
+         }
+         
+         // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
+         final Response transactionConfirmationResponse = Response.read(dis);
+         if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
+             // Confirm Checksum and echo back the confirmation.
+             logger.debug("{} Received {}  from {}", this, transactionConfirmationResponse, peer);
+             final String receivedCRC = transactionConfirmationResponse.getMessage();
+ 
+             if ( versionNegotiator.getVersion() > 3 ) {
+                 if ( !receivedCRC.equals(calculatedCRC) ) {
+                     ResponseCode.BAD_CHECKSUM.writeResponse(dos);
+                     session.rollback();
+                     throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
+                 }
+             }
+ 
+             ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
+         } else {
+             throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
+         }
+ 
+         final String flowFileDescription = flowFilesSent.size() < 20 ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+         
+         final Response transactionResponse;
+         try {
+             transactionResponse = Response.read(dis);
+         } catch (final IOException e) {
+             logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator." +
+                 " It is unknown whether or not the peer successfully received/processed the data." +
+                 " Therefore, {} will be rolled back, possibly resulting in data duplication of {}", 
+                 this, peer, session, flowFileDescription);
+             session.rollback();
+             throw e;
+         }
+         
+         logger.debug("{} received {} from {}", new Object[] {this, transactionResponse, peer});
+         if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
+             peer.penalize(port.getYieldPeriod(TimeUnit.MILLISECONDS));
+         } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
+             throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
+         }
+         
+         session.commit();
+         
+         stopWatch.stop();
+         final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
+         final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+         final String dataSize = FormatUtils.formatDataSize(bytesSent);
+         logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
+             this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+ 
+         return flowFilesSent.size();
+     }
+     
+     
+     @Override
+     public int receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+         if ( !handshakeCompleted ) {
+             throw new IllegalStateException("Handshake has not been completed");
+         }
+         if ( shutdown ) {
+             throw new IllegalStateException("Protocol is shutdown");
+         }
+ 
+         logger.debug("{} receiving FlowFiles from {}", this, peer);
+         
+         final CommunicationsSession commsSession = peer.getCommunicationsSession();
+         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+         String remoteDn = commsSession.getUserDn();
+         if ( remoteDn == null ) {
+             remoteDn = "none";
+         }
+ 
+         final StopWatch stopWatch = new StopWatch(true);
+         final CRC32 crc = new CRC32();
+         
+         // Peer has data. Otherwise, we would not have been called, because they would not have sent
+         // a SEND_FLOWFILES request to use. Just decode the bytes into FlowFiles until peer says he's
+         // finished sending data.
+         final Set<FlowFile> flowFilesReceived = new HashSet<>();
+         long bytesReceived = 0L;
+         boolean continueTransaction = true;
+         String calculatedCRC = "";
+         while (continueTransaction) {
+             final long startNanos = System.nanoTime();
+             final InputStream flowFileInputStream = useGzip ? new CompressionInputStream(dis) : dis;
+             final CheckedInputStream checkedInputStream = new CheckedInputStream(flowFileInputStream, crc);
+ 
+             FlowFile flowFile = codec.decode(checkedInputStream, session);
+             final long transferNanos = System.nanoTime() - startNanos;
+             final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+             final String sourceSystemFlowFileUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
+             flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+             
+             final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceSystemFlowFileUuid;
+             session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceSystemFlowFileUuid, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transferMillis);
+             session.transfer(flowFile, Relationship.ANONYMOUS);
+             flowFilesReceived.add(flowFile);
+             bytesReceived += flowFile.getSize();
+             
+             final Response transactionResponse = Response.read(dis);
+             switch (transactionResponse.getCode()) {
+                 case CONTINUE_TRANSACTION:
+                     logger.debug("{} Received ContinueTransaction indicator from {}", this, peer);
+                     break;
+                 case FINISH_TRANSACTION:
+                     logger.debug("{} Received FinishTransaction indicator from {}", this, peer);
+                     continueTransaction = false;
+                     calculatedCRC = String.valueOf(checkedInputStream.getChecksum().getValue());
+                     break;
+                 default:
+                     throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionResponse);
+             }
+         }
+         
+         // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
+         // to peer so that we can verify that the connection is still open. This is a two-phase commit,
+         // which helps to prevent the chances of data duplication. Without doing this, we may commit the
+         // session and then when we send the response back to the peer, the peer may have timed out and may not
+         // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
+         // Critical Section involved in this transaction so that rather than the Critical Section being the
+         // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
+         logger.debug("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
+         ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
+         
+         final Response confirmTransactionResponse = Response.read(dis);
+         logger.debug("{} Received {} from {}", this, confirmTransactionResponse, peer);
+ 
+         switch (confirmTransactionResponse.getCode()) {
+             case CONFIRM_TRANSACTION:
+                 break;
+             case BAD_CHECKSUM:
+                 session.rollback();
+                 throw new IOException(this + " Received a BadChecksum response from peer " + peer);
+             default:
+                 throw new ProtocolException(this + " Received unexpected Response Code from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
+         }
+         
+         // Commit the session so that we have persisted the data
+         session.commit();
+         
 -        if ( session.getAvailableRelationships().isEmpty() ) {
++        if ( context.getAvailableRelationships().isEmpty() ) {
+             // Confirm that we received the data and the peer can now discard it but that the peer should not
+             // send any more data for a bit
+             logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
+             ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
+         } else {
+             // Confirm that we received the data and the peer can now discard it
+             logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
+             ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
+         }
+         
+         stopWatch.stop();
+         final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
+         final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
+         final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+         final String dataSize = FormatUtils.formatDataSize(bytesReceived);
+         logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] {
+             this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+ 
+         return flowFilesReceived.size();
+     }
+     
+     @Override
+     public RequestType getRequestType(final Peer peer) throws IOException {
+         if ( !handshakeCompleted ) {
+             throw new IllegalStateException("Handshake has not been completed");
+         }
+         if ( shutdown ) {
+             throw new IllegalStateException("Protocol is shutdown");
+         }
+ 
+         logger.debug("{} Reading Request Type from {} using {}", new Object[] {this, peer, peer.getCommunicationsSession()});
+         final RequestType requestType = RequestType.readRequestType(new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream()));
+         logger.debug("{} Got Request Type {} from {}", new Object[] {this, requestType, peer});
+ 
+         return requestType;
+     }
+ 
+     @Override
+     public VersionNegotiator getVersionNegotiator() {
+         return versionNegotiator;
+     }
+ 
+     @Override
+     public void shutdown(final Peer peer) {
+         logger.debug("{} Shutting down with {}", this, peer);
+         shutdown = true;
+     }
+ 
+     @Override
+     public boolean isShutdown() {
+         return shutdown;
+     }
+ 
+     @Override
+     public void sendPeerList(final Peer peer) throws IOException {
+         if ( !handshakeCompleted ) {
+             throw new IllegalStateException("Handshake has not been completed");
+         }
+         if ( shutdown ) {
+             throw new IllegalStateException("Protocol is shutdown");
+         }
+ 
+         logger.debug("{} Sending Peer List to {}", this, peer);
+         final CommunicationsSession commsSession = peer.getCommunicationsSession();
+         final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+ 
+         final NiFiProperties properties = NiFiProperties.getInstance();
+         
+         // we have only 1 peer: ourselves.
+         dos.writeInt(1);
+         dos.writeUTF(InetAddress.getLocalHost().getHostName());
+         dos.writeInt(properties.getRemoteInputPort());
+         dos.writeBoolean(properties.isSiteToSiteSecure());
+         dos.writeInt(0);    // doesn't matter how many FlowFiles we have, because we're the only host.
+         dos.flush();
+     }
+     
+     @Override
+     public String getResourceName() {
+         return RESOURCE_NAME;
+     }
+     
+     @Override
+     public void setNodeInformant(final NodeInformant nodeInformant) {
+     }
+ 
+     @Override
+     public long getRequestExpiration() {
+         return requestExpirationMillis;
+     }
+     
+     @Override
+     public String toString() {
+         return "SocketFlowFileServerProtocol[CommsID=" + commsIdentifier + "]";
+     }
+ }