You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/16 17:57:50 UTC
[5/8] incubator-nifi git commit: Merge branch 'develop' of
https://git-wip-us.apache.org/repos/asf/incubator-nifi into develop
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
index 0000000,8c60e4b..acb3a01
mode 000000,100644..100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java
@@@ -1,0 -1,173 +1,194 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.controller.scheduling;
+
++import java.util.Collection;
++import java.util.Collections;
+ import java.util.HashMap;
++import java.util.HashSet;
+ import java.util.Map;
++import java.util.Set;
+ import java.util.concurrent.TimeUnit;
+
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
+ import org.apache.nifi.connectable.Connectable;
++import org.apache.nifi.connectable.Connection;
+ import org.apache.nifi.controller.ControllerService;
+ import org.apache.nifi.controller.ControllerServiceLookup;
+ import org.apache.nifi.encrypt.StringEncryptor;
+ import org.apache.nifi.expression.AttributeValueDecorator;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.processor.DataUnit;
+ import org.apache.nifi.processor.ProcessContext;
++import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.processor.exception.ProcessException;
+
+ /**
+ * This class is essentially an empty shell for {@link Connectable}s that are
+ * not Processors
+ */
+ public class ConnectableProcessContext implements ProcessContext {
+
+ private final Connectable connectable;
+ private final StringEncryptor encryptor;
+
+ public ConnectableProcessContext(final Connectable connectable, final StringEncryptor encryptor) {
+ this.connectable = connectable;
+ this.encryptor = encryptor;
+ }
+
+ @Override
+ public PropertyValue getProperty(final PropertyDescriptor descriptor) {
+ return getProperty(descriptor.getName());
+ }
+
+ @Override
+ public PropertyValue getProperty(final String propertyName) {
+ return new PropertyValue() {
+ @Override
+ public String getValue() {
+ return null;
+ }
+
+ @Override
+ public Integer asInteger() {
+ return null;
+ }
+
+ @Override
+ public Long asLong() {
+ return null;
+ }
+
+ @Override
+ public Boolean asBoolean() {
+ return null;
+ }
+
+ @Override
+ public Float asFloat() {
+ return null;
+ }
+
+ @Override
+ public Double asDouble() {
+ return null;
+ }
+
+ @Override
+ public Long asTimePeriod(final TimeUnit timeUnit) {
+ return null;
+ }
+
+ @Override
+ public Double asDataSize(final DataUnit dataUnit) {
+ return null;
+ }
+
+ @Override
+ public PropertyValue evaluateAttributeExpressions() throws ProcessException {
+ return this;
+ }
+
+ @Override
+ public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile) throws ProcessException {
+ return this;
+ }
+
+ @Override
+ public PropertyValue evaluateAttributeExpressions(final AttributeValueDecorator decorator) throws ProcessException {
+ return this;
+ }
+
+ @Override
+ public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException {
+ return this;
+ }
+
+ @Override
+ public ControllerService asControllerService() {
+ return null;
+ }
+
+ @Override
+ public <T extends ControllerService> T asControllerService(Class<T> serviceType) throws IllegalArgumentException {
+ return null;
+ }
+
+ @Override
+ public boolean isSet() {
+ return false;
+ }
+ };
+ }
+
+ @Override
+ public PropertyValue newPropertyValue(String rawValue) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void yield() {
+ connectable.yield();
+ }
+
+ @Override
+ public int getMaxConcurrentTasks() {
+ return connectable.getMaxConcurrentTasks();
+ }
+
+ @Override
+ public String getAnnotationData() {
+ return null;
+ }
+
+ @Override
+ public Map<PropertyDescriptor, String> getProperties() {
+ return new HashMap<>();
+ }
+
+ @Override
+ public String decrypt(String encrypted) {
+ return encryptor.decrypt(encrypted);
+ }
+
+ @Override
+ public String encrypt(String unencrypted) {
+ return encryptor.encrypt(unencrypted);
+ }
+
+ @Override
+ public ControllerServiceLookup getControllerServiceLookup() {
+ return null;
+ }
++
++ @Override
++ public Set<Relationship> getAvailableRelationships() {
++ for ( final Connection connection : connectable.getConnections() ) {
++ if ( connection.getFlowFileQueue().isFull() ) {
++ return Collections.emptySet();
++ }
++ }
++
++ final Collection<Relationship> relationships = connectable.getRelationships();
++ if ( relationships instanceof Set ) {
++ return (Set<Relationship>) relationships;
++ }
++ return new HashSet<>(connectable.getRelationships());
++ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
index 0000000,93a8c6b..cd0d31c
mode 000000,100644..100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java
@@@ -1,0 -1,145 +1,173 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processor;
+
++import java.util.Collection;
+ import java.util.HashMap;
++import java.util.HashSet;
+ import java.util.Map;
+ import java.util.Set;
+
+ import org.apache.nifi.attribute.expression.language.PreparedQuery;
+ import org.apache.nifi.attribute.expression.language.Query;
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
++import org.apache.nifi.connectable.Connection;
+ import org.apache.nifi.controller.ControllerService;
+ import org.apache.nifi.controller.ControllerServiceLookup;
+ import org.apache.nifi.controller.ProcessorNode;
+ import org.apache.nifi.controller.service.ControllerServiceProvider;
+ import org.apache.nifi.encrypt.StringEncryptor;
+
+ public class StandardProcessContext implements ProcessContext, ControllerServiceLookup {
+
+ private final ProcessorNode procNode;
+ private final ControllerServiceProvider controllerServiceProvider;
+ private final Map<PropertyDescriptor, PreparedQuery> preparedQueries;
+ private final StringEncryptor encryptor;
+
+ public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor) {
+ this.procNode = processorNode;
+ this.controllerServiceProvider = controllerServiceProvider;
+ this.encryptor = encryptor;
+
+ preparedQueries = new HashMap<>();
+ for (final Map.Entry<PropertyDescriptor, String> entry : procNode.getProperties().entrySet()) {
+ final PropertyDescriptor desc = entry.getKey();
+ String value = entry.getValue();
+ if (value == null) {
+ value = desc.getDefaultValue();
+ }
+
+ final PreparedQuery pq = Query.prepare(value);
+ preparedQueries.put(desc, pq);
+ }
+ }
+
+ @Override
+ public PropertyValue getProperty(final PropertyDescriptor descriptor) {
+ return getProperty(descriptor.getName());
+ }
+
+ /**
+ * <p>
+ * Returns the currently configured value for the property with the given
+ * name.
+ * </p>
+ */
+ @Override
+ public PropertyValue getProperty(final String propertyName) {
+ final Processor processor = procNode.getProcessor();
+ final PropertyDescriptor descriptor = processor.getPropertyDescriptor(propertyName);
+ if (descriptor == null) {
+ return null;
+ }
+
+ final String setPropertyValue = procNode.getProperty(descriptor);
+ final String propValue = (setPropertyValue == null) ? descriptor.getDefaultValue() : setPropertyValue;
+
+ return new StandardPropertyValue(propValue, this, preparedQueries.get(descriptor));
+ }
+
+ @Override
+ public PropertyValue newPropertyValue(final String rawValue) {
+ return new StandardPropertyValue(rawValue, this, Query.prepare(rawValue));
+ }
+
+ @Override
+ public void yield() {
+ procNode.yield();
+ }
+
+ @Override
+ public ControllerService getControllerService(final String serviceIdentifier) {
+ return controllerServiceProvider.getControllerService(serviceIdentifier);
+ }
+
+ @Override
+ public int getMaxConcurrentTasks() {
+ return procNode.getMaxConcurrentTasks();
+ }
+
+ @Override
+ public String getAnnotationData() {
+ return procNode.getAnnotationData();
+ }
+
+ @Override
+ public Map<PropertyDescriptor, String> getProperties() {
+ return procNode.getProperties();
+ }
+
+ @Override
+ public String encrypt(final String unencrypted) {
+ return encryptor.encrypt(unencrypted);
+ }
+
+ @Override
+ public String decrypt(final String encrypted) {
+ return encryptor.decrypt(encrypted);
+ }
+
+ @Override
+ public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
+ if (!serviceType.isInterface()) {
+ throw new IllegalArgumentException("ControllerServices may be referenced only via their interfaces; " + serviceType + " is not an interface");
+ }
+ return controllerServiceProvider.getControllerServiceIdentifiers(serviceType);
+ }
+
+ @Override
+ public boolean isControllerServiceEnabled(final ControllerService service) {
+ return controllerServiceProvider.isControllerServiceEnabled(service);
+ }
+
+ @Override
+ public boolean isControllerServiceEnabled(final String serviceIdentifier) {
+ return controllerServiceProvider.isControllerServiceEnabled(serviceIdentifier);
+ }
+
+ @Override
+ public ControllerServiceLookup getControllerServiceLookup() {
+ return this;
+ }
++
++ @Override
++ public Set<Relationship> getAvailableRelationships() {
++ final Set<Relationship> set = new HashSet<>();
++ for (final Relationship relationship : procNode.getRelationships()) {
++ final Collection<Connection> connections = procNode.getConnections(relationship);
++ if (connections.isEmpty()) {
++ set.add(relationship);
++ } else {
++ boolean available = true;
++ for (final Connection connection : connections) {
++ if (connection.getFlowFileQueue().isFull()) {
++ available = false;
++ }
++ }
++
++ if (available) {
++ set.add(relationship);
++ }
++ }
++ }
++
++ return set;
++ }
++
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
index 0000000,0fe08c9..318901f
mode 000000,100644..100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
@@@ -1,0 -1,107 +1,113 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.processor;
+
+ import java.util.Map;
++import java.util.Set;
+
+ import org.apache.nifi.components.PropertyDescriptor;
+ import org.apache.nifi.components.PropertyValue;
+ import org.apache.nifi.controller.ControllerServiceLookup;
+ import org.apache.nifi.controller.ProcessorNode;
+ import org.apache.nifi.controller.service.ControllerServiceNode;
+ import org.apache.nifi.controller.service.ControllerServiceProvider;
+
+ public class StandardSchedulingContext implements SchedulingContext {
+
+ private final ProcessContext processContext;
+ private final ControllerServiceProvider serviceProvider;
+ private final ProcessorNode processorNode;
+
+ public StandardSchedulingContext(final ProcessContext processContext, final ControllerServiceProvider serviceProvider, final ProcessorNode processorNode) {
+ this.processContext = processContext;
+ this.serviceProvider = serviceProvider;
+ this.processorNode = processorNode;
+ }
+
+ @Override
+ public void leaseControllerService(final String identifier) {
+ final ControllerServiceNode serviceNode = serviceProvider.getControllerServiceNode(identifier);
+ if (serviceNode == null) {
+ throw new IllegalArgumentException("Cannot lease Controller Service because no Controller Service exists with identifier " + identifier);
+ }
+
+ if (serviceNode.isDisabled()) {
+ throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getControllerService() + " is currently disabled");
+ }
+
+ if (!serviceNode.isValid()) {
+ throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getControllerService() + " is not currently valid");
+ }
+
+ serviceNode.addReference(processorNode);
+ }
+
+ @Override
+ public PropertyValue getProperty(final PropertyDescriptor descriptor) {
+ return processContext.getProperty(descriptor);
+ }
+
+ @Override
+ public PropertyValue getProperty(final String propertyName) {
+ return processContext.getProperty(propertyName);
+ }
+
+ @Override
+ public PropertyValue newPropertyValue(final String rawValue) {
+ return processContext.newPropertyValue(rawValue);
+ }
+
+ @Override
+ public void yield() {
+ processContext.yield();
+ }
+
+ @Override
+ public int getMaxConcurrentTasks() {
+ return processContext.getMaxConcurrentTasks();
+ }
+
+ @Override
+ public String getAnnotationData() {
+ return processContext.getAnnotationData();
+ }
+
+ @Override
+ public Map<PropertyDescriptor, String> getProperties() {
+ return processContext.getProperties();
+ }
+
+ @Override
+ public String encrypt(final String unencrypted) {
+ return processContext.encrypt(unencrypted);
+ }
+
+ @Override
+ public String decrypt(final String encrypted) {
+ return processContext.decrypt(encrypted);
+ }
+
+ @Override
+ public ControllerServiceLookup getControllerServiceLookup() {
+ return processContext.getControllerServiceLookup();
+ }
++
++ @Override
++ public Set<Relationship> getAvailableRelationships() {
++ return processContext.getAvailableRelationships();
++ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 0000000,22ec983..d4b4f61
mode 000000,100644..100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@@ -1,0 -1,510 +1,510 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.remote.protocol.socket;
+
+ import java.io.DataInputStream;
+ import java.io.DataOutputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+ import java.util.concurrent.TimeUnit;
+ import java.util.zip.CRC32;
+ import java.util.zip.CheckedInputStream;
+ import java.util.zip.CheckedOutputStream;
+
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.remote.Peer;
+ import org.apache.nifi.remote.PeerStatus;
+ import org.apache.nifi.remote.RemoteGroupPort;
+ import org.apache.nifi.remote.RemoteResourceFactory;
+ import org.apache.nifi.remote.StandardVersionNegotiator;
+ import org.apache.nifi.remote.VersionNegotiator;
+ import org.apache.nifi.remote.codec.FlowFileCodec;
+ import org.apache.nifi.remote.codec.StandardFlowFileCodec;
+ import org.apache.nifi.remote.exception.HandshakeException;
+ import org.apache.nifi.remote.exception.ProtocolException;
+ import org.apache.nifi.remote.io.CompressionInputStream;
+ import org.apache.nifi.remote.io.CompressionOutputStream;
+ import org.apache.nifi.remote.protocol.ClientProtocol;
+ import org.apache.nifi.remote.protocol.CommunicationsSession;
+ import org.apache.nifi.remote.protocol.RequestType;
+ import org.apache.nifi.util.FormatUtils;
+ import org.apache.nifi.util.StopWatch;
+
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class SocketClientProtocol implements ClientProtocol {
+ private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1);
+
+
+ private RemoteGroupPort port;
+ private boolean useCompression;
+
+ private String commsIdentifier;
+ private boolean handshakeComplete = false;
+
+ private final Logger logger = LoggerFactory.getLogger(SocketClientProtocol.class);
+
+ private Response handshakeResponse = null;
+ private boolean readyForFileTransfer = false;
+ private String transitUriPrefix = null;
+
+ private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
+
+ public SocketClientProtocol() {
+ }
+
+ public void setPort(final RemoteGroupPort port) {
+ this.port = port;
+ this.useCompression = port.isUseCompression();
+ }
+
+ @Override
+ public void handshake(final Peer peer) throws IOException, HandshakeException {
+ if ( handshakeComplete ) {
+ throw new IllegalStateException("Handshake has already been completed");
+ }
+ commsIdentifier = UUID.randomUUID().toString();
+ logger.debug("{} handshaking with {}", this, peer);
+
+ final Map<HandshakeProperty, String> properties = new HashMap<>();
+ properties.put(HandshakeProperty.GZIP, String.valueOf(useCompression));
+ properties.put(HandshakeProperty.PORT_IDENTIFIER, port.getIdentifier());
+ properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(
+ port.getRemoteProcessGroup().getCommunicationsTimeout(TimeUnit.MILLISECONDS)) );
+
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ commsSession.setTimeout(port.getRemoteProcessGroup().getCommunicationsTimeout(TimeUnit.MILLISECONDS));
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+
+ dos.writeUTF(commsIdentifier);
+
+ if ( versionNegotiator.getVersion() >= 3 ) {
+ dos.writeUTF(peer.getUrl());
+ transitUriPrefix = peer.getUrl();
+
+ if ( !transitUriPrefix.endsWith("/") ) {
+ transitUriPrefix = transitUriPrefix + "/";
+ }
+ }
+
+ dos.writeInt(properties.size());
+ for ( final Map.Entry<HandshakeProperty, String> entry : properties.entrySet() ) {
+ dos.writeUTF(entry.getKey().name());
+ dos.writeUTF(entry.getValue());
+ }
+
+ dos.flush();
+
+ try {
+ handshakeResponse = Response.read(dis);
+ } catch (final ProtocolException e) {
+ throw new HandshakeException(e);
+ }
+
+ switch (handshakeResponse.getCode()) {
+ case PORT_NOT_IN_VALID_STATE:
+ case UNKNOWN_PORT:
+ case PORTS_DESTINATION_FULL:
+ break;
+ case PROPERTIES_OK:
+ readyForFileTransfer = true;
+ break;
+ default:
+ logger.error("{} received unexpected response {} from {} when negotiating Codec", new Object[] {
+ this, handshakeResponse, peer});
+ peer.close();
+ throw new HandshakeException("Received unexpected response " + handshakeResponse);
+ }
+
+ logger.debug("{} Finished handshake with {}", this, peer);
+ handshakeComplete = true;
+ }
+
+ public boolean isReadyForFileTransfer() {
+ return readyForFileTransfer;
+ }
+
+ public boolean isPortInvalid() {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not completed successfully");
+ }
+ return handshakeResponse.getCode() == ResponseCode.PORT_NOT_IN_VALID_STATE;
+ }
+
+ public boolean isPortUnknown() {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not completed successfully");
+ }
+ return handshakeResponse.getCode() == ResponseCode.UNKNOWN_PORT;
+ }
+
+ public boolean isDestinationFull() {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not completed successfully");
+ }
+ return handshakeResponse.getCode() == ResponseCode.PORTS_DESTINATION_FULL;
+ }
+
+ @Override
+ public Set<PeerStatus> getPeerStatuses(final Peer peer) throws IOException {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not been performed");
+ }
+
+ logger.debug("{} Get Peer Statuses from {}", this, peer);
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+
+ RequestType.REQUEST_PEER_LIST.writeRequestType(dos);
+ dos.flush();
+ final int numPeers = dis.readInt();
+ final Set<PeerStatus> peers = new HashSet<>(numPeers);
+ for (int i=0; i < numPeers; i++) {
+ final String hostname = dis.readUTF();
+ final int port = dis.readInt();
+ final boolean secure = dis.readBoolean();
+ final int flowFileCount = dis.readInt();
+ peers.add(new PeerStatus(hostname, port, secure, flowFileCount));
+ }
+
+ logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer);
+ return peers;
+ }
+
+ @Override
+ public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not been performed");
+ }
+
+ logger.debug("{} Negotiating Codec with {}", this, peer);
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+
+ RequestType.NEGOTIATE_FLOWFILE_CODEC.writeRequestType(dos);
+
+ FlowFileCodec codec = new StandardFlowFileCodec();
+ try {
+ codec = (FlowFileCodec) RemoteResourceFactory.initiateResourceNegotiation(codec, dis, dos);
+ } catch (HandshakeException e) {
+ throw new ProtocolException(e.toString());
+ }
+ logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[] {this, codec, commsSession});
+
+ return codec;
+ }
+
+
+ @Override
+ public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not been performed");
+ }
+ if ( !readyForFileTransfer ) {
+ throw new IllegalStateException("Cannot receive files; handshake resolution was " + handshakeResponse);
+ }
+
+ logger.debug("{} Receiving FlowFiles from {}", this, peer);
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+ String userDn = commsSession.getUserDn();
+ if ( userDn == null ) {
+ userDn = "none";
+ }
+
+ // Indicate that we would like to have some data
+ RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
+ dos.flush();
+
+ // Determine if Peer will send us data or has no data to send us
+ final Response dataAvailableCode = Response.read(dis);
+ switch (dataAvailableCode.getCode()) {
+ case MORE_DATA:
+ logger.debug("{} {} Indicates that data is available", this, peer);
+ break;
+ case NO_MORE_DATA:
+ logger.debug("{} No data available from {}", peer);
+ return;
+ default:
+ throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
+ }
+
+ final StopWatch stopWatch = new StopWatch(true);
+ final Set<FlowFile> flowFilesReceived = new HashSet<>();
+ long bytesReceived = 0L;
+ final CRC32 crc = new CRC32();
+
+ // Peer has data. Decode the bytes into FlowFiles until peer says he's finished sending data.
+ boolean continueTransaction = true;
+ String calculatedCRC = "";
+ while (continueTransaction) {
+ final InputStream flowFileInputStream = useCompression ? new CompressionInputStream(dis) : dis;
+ final CheckedInputStream checkedIn = new CheckedInputStream(flowFileInputStream, crc);
+
+ final long startNanos = System.nanoTime();
+ FlowFile flowFile = codec.decode(checkedIn, session);
+ final long transmissionNanos = System.nanoTime() - startNanos;
+ final long transmissionMillis = TimeUnit.MILLISECONDS.convert(transmissionNanos, TimeUnit.NANOSECONDS);
+
+ final String sourceFlowFileIdentifier = flowFile.getAttribute(CoreAttributes.UUID.key());
+ flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+
+ final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier;
+ session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transmissionMillis);
+
+ session.transfer(flowFile, Relationship.ANONYMOUS);
+ bytesReceived += flowFile.getSize();
+ flowFilesReceived.add(flowFile);
+ logger.debug("{} Received {} from {}", this, flowFile, peer);
+
+ final Response transactionCode = Response.read(dis);
+ switch (transactionCode.getCode()) {
+ case CONTINUE_TRANSACTION:
+ logger.trace("{} Received ContinueTransaction indicator from {}", this, peer);
+ break;
+ case FINISH_TRANSACTION:
+ logger.trace("{} Received FinishTransaction indicator from {}", this, peer);
+ continueTransaction = false;
+ calculatedCRC = String.valueOf(checkedIn.getChecksum().getValue());
+ break;
+ default:
+ throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionCode);
+ }
+ }
+
+ // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
+ // to peer so that we can verify that the connection is still open. This is a two-phase commit,
+ // which helps to prevent the chances of data duplication. Without doing this, we may commit the
+ // session and then when we send the response back to the peer, the peer may have timed out and may not
+ // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
+ // Critical Section involved in this transaction so that rather than the Critical Section being the
+ // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
+ logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
+ ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
+
+ final Response confirmTransactionResponse = Response.read(dis);
+ logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer);
+
+ switch (confirmTransactionResponse.getCode()) {
+ case CONFIRM_TRANSACTION:
+ break;
+ case BAD_CHECKSUM:
+ session.rollback();
+ throw new IOException(this + " Received a BadChecksum response from peer " + peer);
+ default:
+ throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
+ }
+
+ // Commit the session so that we have persisted the data
+ session.commit();
+
- if ( session.getAvailableRelationships().isEmpty() ) {
++ if ( context.getAvailableRelationships().isEmpty() ) {
+ // Confirm that we received the data and the peer can now discard it but that the peer should not
+ // send any more data for a bit
+ logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
+ ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
+ } else {
+ // Confirm that we received the data and the peer can now discard it
+ logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
+ ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
+ }
+
+ stopWatch.stop();
+ final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
+ final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
+ final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ final String dataSize = FormatUtils.formatDataSize(bytesReceived);
+ logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] {
+ this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+ }
+
+ @Override
+ public void transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+ if ( !handshakeComplete ) {
+ throw new IllegalStateException("Handshake has not been performed");
+ }
+ if ( !readyForFileTransfer ) {
+ throw new IllegalStateException("Cannot transfer files; handshake resolution was " + handshakeResponse);
+ }
+
+ FlowFile flowFile = session.get();
+ if ( flowFile == null ) {
+ return;
+ }
+
+ logger.debug("{} Sending FlowFiles to {}", this, peer);
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+ String userDn = commsSession.getUserDn();
+ if ( userDn == null ) {
+ userDn = "none";
+ }
+
+ // Indicate that we would like to have some data
+ RequestType.SEND_FLOWFILES.writeRequestType(dos);
+ dos.flush();
+
+ final StopWatch stopWatch = new StopWatch(true);
+ final CRC32 crc = new CRC32();
+
+ long bytesSent = 0L;
+ final Set<FlowFile> flowFilesSent = new HashSet<>();
+ boolean continueTransaction = true;
+ String calculatedCRC = "";
+ final long startSendingNanos = System.nanoTime();
+ while (continueTransaction) {
+ final OutputStream flowFileOutputStream = useCompression ? new CompressionOutputStream(dos) : dos;
+ logger.debug("{} Sending {} to {}", this, flowFile, peer);
+
+ final CheckedOutputStream checkedOutStream = new CheckedOutputStream(flowFileOutputStream, crc);
+
+ final long startNanos = System.nanoTime();
+ flowFile = codec.encode(flowFile, session, checkedOutStream);
+ final long transferNanos = System.nanoTime() - startNanos;
+ final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+
+ // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
+ // Otherwise, do NOT close it because we don't want to close the underlying stream
+ // (CompressionOutputStream will not close the underlying stream when it's closed)
+ if ( useCompression ) {
+ checkedOutStream.close();
+ }
+
+ flowFilesSent.add(flowFile);
+ bytesSent += flowFile.getSize();
+ logger.debug("{} Sent {} to {}", this, flowFile, peer);
+
+ final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
+ session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false);
+ session.remove(flowFile);
+
+ final long sendingNanos = System.nanoTime() - startSendingNanos;
+ if ( sendingNanos < BATCH_SEND_NANOS ) {
+ flowFile = session.get();
+ } else {
+ flowFile = null;
+ }
+
+ continueTransaction = (flowFile != null);
+ if ( continueTransaction ) {
+ logger.debug("{} Sent CONTINUE_TRANSACTION indicator to {}", this, peer);
+ ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
+ } else {
+ logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer);
+ ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
+
+ calculatedCRC = String.valueOf( checkedOutStream.getChecksum().getValue() );
+ }
+ }
+
+ // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
+ final Response transactionConfirmationResponse = Response.read(dis);
+ if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
+ // Confirm checksum and echo back the confirmation.
+ logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer);
+ final String receivedCRC = transactionConfirmationResponse.getMessage();
+
+ if ( versionNegotiator.getVersion() > 3 ) {
+ if ( !receivedCRC.equals(calculatedCRC) ) {
+ ResponseCode.BAD_CHECKSUM.writeResponse(dos);
+ session.rollback();
+ throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
+ }
+ }
+
+ ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
+ } else {
+ throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
+ }
+
+ final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+
+ final Response transactionResponse;
+ try {
+ transactionResponse = Response.read(dis);
+ } catch (final IOException e) {
+ logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator." +
+ " It is unknown whether or not the peer successfully received/processed the data." +
+ " Therefore, {} will be rolled back, possibly resulting in data duplication of {}",
+ this, peer, session, flowFileDescription);
+ session.rollback();
+ throw e;
+ }
+
+ logger.debug("{} Received {} from {}", this, transactionResponse, peer);
+ if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
+ peer.penalize(port.getYieldPeriod(TimeUnit.MILLISECONDS));
+ } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
+ throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
+ }
+
+ // consume input stream entirely, ignoring its contents. If we
+ // don't do this, the Connection will not be returned to the pool
+ stopWatch.stop();
+ final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
+ final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ final String dataSize = FormatUtils.formatDataSize(bytesSent);
+
+ session.commit();
+
+ logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
+ this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+ }
+
+ @Override
+ public VersionNegotiator getVersionNegotiator() {
+ return versionNegotiator;
+ }
+
+ @Override
+ public void shutdown(final Peer peer) throws IOException {
+ readyForFileTransfer = false;
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+
+ logger.debug("{} Shutting down with {}", this, peer);
+ // Indicate that we would like to have some data
+ RequestType.SHUTDOWN.writeRequestType(dos);
+ dos.flush();
+ }
+
+ @Override
+ public String getResourceName() {
+ return "SocketFlowFileProtocol";
+ }
+
+ @Override
+ public String toString() {
+ return "SocketClientProtocol[CommsID=" + commsIdentifier + "]";
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/73384b23/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --cc nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index 0000000,88b6a41..5edd4f9
mode 000000,100644..100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@@ -1,0 -1,581 +1,581 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.remote.protocol.socket;
+
+ import java.io.DataInputStream;
+ import java.io.DataOutputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.net.InetAddress;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.UUID;
+ import java.util.concurrent.TimeUnit;
+ import java.util.zip.CRC32;
+ import java.util.zip.CheckedInputStream;
+ import java.util.zip.CheckedOutputStream;
+
+ import org.apache.nifi.cluster.NodeInformant;
+ import org.apache.nifi.connectable.Connection;
+ import org.apache.nifi.connectable.Port;
+ import org.apache.nifi.flowfile.FlowFile;
+ import org.apache.nifi.flowfile.attributes.CoreAttributes;
+ import org.apache.nifi.groups.ProcessGroup;
+ import org.apache.nifi.processor.ProcessContext;
+ import org.apache.nifi.processor.ProcessSession;
+ import org.apache.nifi.processor.Relationship;
+ import org.apache.nifi.remote.Peer;
+ import org.apache.nifi.remote.PortAuthorizationResult;
+ import org.apache.nifi.remote.RemoteResourceFactory;
+ import org.apache.nifi.remote.RootGroupPort;
+ import org.apache.nifi.remote.StandardVersionNegotiator;
+ import org.apache.nifi.remote.VersionNegotiator;
+ import org.apache.nifi.remote.codec.FlowFileCodec;
+ import org.apache.nifi.remote.exception.HandshakeException;
+ import org.apache.nifi.remote.exception.ProtocolException;
+ import org.apache.nifi.remote.io.CompressionInputStream;
+ import org.apache.nifi.remote.io.CompressionOutputStream;
+ import org.apache.nifi.remote.protocol.CommunicationsSession;
+ import org.apache.nifi.remote.protocol.RequestType;
+ import org.apache.nifi.remote.protocol.ServerProtocol;
+ import org.apache.nifi.util.FormatUtils;
+ import org.apache.nifi.util.NiFiProperties;
+ import org.apache.nifi.util.StopWatch;
+
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class SocketFlowFileServerProtocol implements ServerProtocol {
+ public static final String RESOURCE_NAME = "SocketFlowFileProtocol";
+
+ private ProcessGroup rootGroup;
+ private String commsIdentifier;
+ private boolean handshakeCompleted;
+
+ private Boolean useGzip;
+ private long requestExpirationMillis;
+ private RootGroupPort port;
+ private boolean shutdown = false;
+ private FlowFileCodec negotiatedFlowFileCodec = null;
+ private String transitUriPrefix = null;
+
+ private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1);
+ private final Logger logger = LoggerFactory.getLogger(SocketFlowFileServerProtocol.class);
+
+ private static final long BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
+
+
+ @Override
+ public void setRootProcessGroup(final ProcessGroup group) {
+ if ( !group.isRootGroup() ) {
+ throw new IllegalArgumentException();
+ }
+ this.rootGroup = group;
+ }
+
+ @Override
+ public void handshake(final Peer peer) throws IOException, HandshakeException {
+ if ( handshakeCompleted ) {
+ throw new IllegalStateException("Handshake has already been completed");
+ }
+ if ( shutdown ) {
+ throw new IllegalStateException("Protocol is shutdown");
+ }
+
+ logger.debug("{} Handshaking with {}", this, peer);
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+
+ commsIdentifier = dis.readUTF();
+
+ if ( versionNegotiator.getVersion() >= 3 ) {
+ transitUriPrefix = dis.readUTF();
+ if ( !transitUriPrefix.endsWith("/") ) {
+ transitUriPrefix = transitUriPrefix + "/";
+ }
+ }
+
+ final Map<String, String> properties = new HashMap<>();
+ final int numProperties = dis.readInt();
+ for (int i=0; i < numProperties; i++) {
+ final String propertyName = dis.readUTF();
+ final String propertyValue = dis.readUTF();
+ properties.put(propertyName, propertyValue);
+ }
+
+ // evaluate the properties received
+ boolean responseWritten = false;
+ for ( final Map.Entry<String, String> entry : properties.entrySet() ) {
+ final String propertyName = entry.getKey();
+ final String value = entry.getValue();
+
+ final HandshakeProperty property;
+ try {
+ property = HandshakeProperty.valueOf(propertyName);
+ } catch (final Exception e) {
+ ResponseCode.UNKNOWN_PROPERTY_NAME.writeResponse(dos, "Unknown Property Name: " + propertyName);
+ throw new HandshakeException("Received unknown property: " + propertyName);
+ }
+
+ switch (property) {
+ case GZIP: {
+ useGzip = Boolean.parseBoolean(value);
+ break;
+ }
+ case REQUEST_EXPIRATION_MILLIS:
+ requestExpirationMillis = Long.parseLong(value);
+ break;
+ case PORT_IDENTIFIER: {
+ Port receivedPort = rootGroup.getInputPort(value);
+ if ( receivedPort == null ) {
+ receivedPort = rootGroup.getOutputPort(value);
+ }
+ if ( receivedPort == null ) {
+ logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value);
+ ResponseCode.UNKNOWN_PORT.writeResponse(dos);
+ throw new HandshakeException("Received unknown port identifier: " + value);
+ }
+ if ( !(receivedPort instanceof RootGroupPort) ) {
+ logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value);
+ ResponseCode.UNKNOWN_PORT.writeResponse(dos);
+ throw new HandshakeException("Received port identifier " + value + ", but this Port is not a RootGroupPort");
+ }
+
+ this.port = (RootGroupPort) receivedPort;
+ final PortAuthorizationResult portAuthResult = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn());
+ if ( !portAuthResult.isAuthorized() ) {
+ logger.debug("Responding with ResponseCode UNAUTHORIZED: ", portAuthResult.getExplanation());
+ ResponseCode.UNAUTHORIZED.writeResponse(dos, portAuthResult.getExplanation());
+ responseWritten = true;
+ break;
+ }
+
+ if ( !receivedPort.isValid() ) {
+ logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort);
+ ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid");
+ responseWritten = true;
+ break;
+ }
+
+ if ( !receivedPort.isRunning() ) {
+ logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort);
+ ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running");
+ responseWritten = true;
+ break;
+ }
+
+ // PORTS_DESTINATION_FULL was introduced in version 2. If version 1, just ignore this
+ // we we will simply not service the request but the sender will timeout
+ if ( getVersionNegotiator().getVersion() > 1 ) {
+ for ( final Connection connection : port.getConnections() ) {
+ if ( connection.getFlowFileQueue().isFull() ) {
+ logger.debug("Responding with ResponseCode PORTS_DESTINATION_FULL for {}", receivedPort);
+ ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos);
+ responseWritten = true;
+ break;
+ }
+ }
+ }
+
+ break;
+ }
+ }
+ }
+
+ if ( useGzip == null ) {
+ logger.debug("Responding with ResponseCode MISSING_PROPERTY because GZIP Property missing");
+ ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.GZIP.name());
+ throw new HandshakeException("Missing Property " + HandshakeProperty.GZIP.name());
+ }
+ if ( port == null ) {
+ logger.debug("Responding with ResponseCode MISSING_PROPERTY because Port Identifier property is missing");
+ ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.PORT_IDENTIFIER.name());
+ throw new HandshakeException("Missing Property " + HandshakeProperty.PORT_IDENTIFIER.name());
+ }
+
+ // send "OK" response
+ if ( !responseWritten ) {
+ ResponseCode.PROPERTIES_OK.writeResponse(dos);
+ }
+
+ logger.debug("{} Finished handshake with {}", this, peer);
+ handshakeCompleted = true;
+ }
+
+ @Override
+ public boolean isHandshakeSuccessful() {
+ return handshakeCompleted;
+ }
+
+ @Override
+ public RootGroupPort getPort() {
+ return port;
+ }
+
+ @Override
+ public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException {
+ if ( !handshakeCompleted ) {
+ throw new IllegalStateException("Handshake has not been completed");
+ }
+ if ( shutdown ) {
+ throw new IllegalStateException("Protocol is shutdown");
+ }
+
+ logger.debug("{} Negotiating Codec with {} using {}", new Object[] {this, peer, peer.getCommunicationsSession()});
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+
+ // Negotiate the FlowFileCodec to use.
+ try {
+ negotiatedFlowFileCodec = RemoteResourceFactory.receiveCodecNegotiation(dis, dos);
+ logger.debug("{} Negotiated Codec {} with {}", new Object[] {this, negotiatedFlowFileCodec, peer});
+ return negotiatedFlowFileCodec;
+ } catch (final HandshakeException e) {
+ throw new ProtocolException(e.toString());
+ }
+ }
+
+ @Override
+ public FlowFileCodec getPreNegotiatedCodec() {
+ return negotiatedFlowFileCodec;
+ }
+
+
+ @Override
+ public int transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+ if ( !handshakeCompleted ) {
+ throw new IllegalStateException("Handshake has not been completed");
+ }
+ if ( shutdown ) {
+ throw new IllegalStateException("Protocol is shutdown");
+ }
+
+ logger.debug("{} Sending FlowFiles to {}", this, peer);
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+ String remoteDn = commsSession.getUserDn();
+ if ( remoteDn == null ) {
+ remoteDn = "none";
+ }
+
+ FlowFile flowFile = session.get();
+ if ( flowFile == null ) {
+ // we have no data to send. Notify the peer.
+ logger.debug("{} No data to send to {}", this, peer);
+ ResponseCode.NO_MORE_DATA.writeResponse(dos);
+ return 0;
+ }
+
+ // we have data to send.
+ logger.debug("{} Data is available to send to {}", this, peer);
+ ResponseCode.MORE_DATA.writeResponse(dos);
+
+ final StopWatch stopWatch = new StopWatch(true);
+ long bytesSent = 0L;
+ final Set<FlowFile> flowFilesSent = new HashSet<>();
+ final CRC32 crc = new CRC32();
+
+ // send data until we reach some batch size
+ boolean continueTransaction = true;
+ final long startNanos = System.nanoTime();
+ String calculatedCRC = "";
+ while (continueTransaction) {
+ final OutputStream flowFileOutputStream = useGzip ? new CompressionOutputStream(dos) : dos;
+ logger.debug("{} Sending {} to {}", new Object[] {this, flowFile, peer});
+
+ final CheckedOutputStream checkedOutputStream = new CheckedOutputStream(flowFileOutputStream, crc);
+
+ final StopWatch transferWatch = new StopWatch(true);
+ flowFile = codec.encode(flowFile, session, checkedOutputStream);
+ final long transmissionMillis = transferWatch.getElapsed(TimeUnit.MILLISECONDS);
+
+ // need to close the CompressionOutputStream in order to force it write out any remaining bytes.
+ // Otherwise, do NOT close it because we don't want to close the underlying stream
+ // (CompressionOutputStream will not close the underlying stream when it's closed)
+ if ( useGzip ) {
+ checkedOutputStream.close();
+ }
+
+ flowFilesSent.add(flowFile);
+ bytesSent += flowFile.getSize();
+
+ final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
+ session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false);
+ session.remove(flowFile);
+
+ final long sendingNanos = System.nanoTime() - startNanos;
+ if ( sendingNanos < BATCH_NANOS ) {
+ flowFile = session.get();
+ } else {
+ flowFile = null;
+ }
+
+ continueTransaction = (flowFile != null);
+ if ( continueTransaction ) {
+ logger.debug("{} Sending ContinueTransaction indicator to {}", this, peer);
+ ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
+ } else {
+ logger.debug("{} Sending FinishTransaction indicator to {}", this, peer);
+ ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
+ calculatedCRC = String.valueOf(checkedOutputStream.getChecksum().getValue());
+ }
+ }
+
+ // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response
+ final Response transactionConfirmationResponse = Response.read(dis);
+ if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) {
+ // Confirm Checksum and echo back the confirmation.
+ logger.debug("{} Received {} from {}", this, transactionConfirmationResponse, peer);
+ final String receivedCRC = transactionConfirmationResponse.getMessage();
+
+ if ( versionNegotiator.getVersion() > 3 ) {
+ if ( !receivedCRC.equals(calculatedCRC) ) {
+ ResponseCode.BAD_CHECKSUM.writeResponse(dos);
+ session.rollback();
+ throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
+ }
+ }
+
+ ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
+ } else {
+ throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
+ }
+
+ final String flowFileDescription = flowFilesSent.size() < 20 ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
+
+ final Response transactionResponse;
+ try {
+ transactionResponse = Response.read(dis);
+ } catch (final IOException e) {
+ logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator." +
+ " It is unknown whether or not the peer successfully received/processed the data." +
+ " Therefore, {} will be rolled back, possibly resulting in data duplication of {}",
+ this, peer, session, flowFileDescription);
+ session.rollback();
+ throw e;
+ }
+
+ logger.debug("{} received {} from {}", new Object[] {this, transactionResponse, peer});
+ if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) {
+ peer.penalize(port.getYieldPeriod(TimeUnit.MILLISECONDS));
+ } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) {
+ throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
+ }
+
+ session.commit();
+
+ stopWatch.stop();
+ final String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
+ final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ final String dataSize = FormatUtils.formatDataSize(bytesSent);
+ logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] {
+ this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+
+ return flowFilesSent.size();
+ }
+
+
+ @Override
+ public int receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException {
+ if ( !handshakeCompleted ) {
+ throw new IllegalStateException("Handshake has not been completed");
+ }
+ if ( shutdown ) {
+ throw new IllegalStateException("Protocol is shutdown");
+ }
+
+ logger.debug("{} receiving FlowFiles from {}", this, peer);
+
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+ String remoteDn = commsSession.getUserDn();
+ if ( remoteDn == null ) {
+ remoteDn = "none";
+ }
+
+ final StopWatch stopWatch = new StopWatch(true);
+ final CRC32 crc = new CRC32();
+
+ // Peer has data. Otherwise, we would not have been called, because they would not have sent
+ // a SEND_FLOWFILES request to use. Just decode the bytes into FlowFiles until peer says he's
+ // finished sending data.
+ final Set<FlowFile> flowFilesReceived = new HashSet<>();
+ long bytesReceived = 0L;
+ boolean continueTransaction = true;
+ String calculatedCRC = "";
+ while (continueTransaction) {
+ final long startNanos = System.nanoTime();
+ final InputStream flowFileInputStream = useGzip ? new CompressionInputStream(dis) : dis;
+ final CheckedInputStream checkedInputStream = new CheckedInputStream(flowFileInputStream, crc);
+
+ FlowFile flowFile = codec.decode(checkedInputStream, session);
+ final long transferNanos = System.nanoTime() - startNanos;
+ final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
+ final String sourceSystemFlowFileUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
+ flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString());
+
+ final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceSystemFlowFileUuid;
+ session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceSystemFlowFileUuid, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transferMillis);
+ session.transfer(flowFile, Relationship.ANONYMOUS);
+ flowFilesReceived.add(flowFile);
+ bytesReceived += flowFile.getSize();
+
+ final Response transactionResponse = Response.read(dis);
+ switch (transactionResponse.getCode()) {
+ case CONTINUE_TRANSACTION:
+ logger.debug("{} Received ContinueTransaction indicator from {}", this, peer);
+ break;
+ case FINISH_TRANSACTION:
+ logger.debug("{} Received FinishTransaction indicator from {}", this, peer);
+ continueTransaction = false;
+ calculatedCRC = String.valueOf(checkedInputStream.getChecksum().getValue());
+ break;
+ default:
+ throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionResponse);
+ }
+ }
+
+ // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message
+ // to peer so that we can verify that the connection is still open. This is a two-phase commit,
+ // which helps to prevent the chances of data duplication. Without doing this, we may commit the
+ // session and then when we send the response back to the peer, the peer may have timed out and may not
+ // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the
+ // Critical Section involved in this transaction so that rather than the Critical Section being the
+ // time window involved in the entire transaction, it is reduced to a simple round-trip conversation.
+ logger.debug("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
+ ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
+
+ final Response confirmTransactionResponse = Response.read(dis);
+ logger.debug("{} Received {} from {}", this, confirmTransactionResponse, peer);
+
+ switch (confirmTransactionResponse.getCode()) {
+ case CONFIRM_TRANSACTION:
+ break;
+ case BAD_CHECKSUM:
+ session.rollback();
+ throw new IOException(this + " Received a BadChecksum response from peer " + peer);
+ default:
+ throw new ProtocolException(this + " Received unexpected Response Code from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
+ }
+
+ // Commit the session so that we have persisted the data
+ session.commit();
+
- if ( session.getAvailableRelationships().isEmpty() ) {
++ if ( context.getAvailableRelationships().isEmpty() ) {
+ // Confirm that we received the data and the peer can now discard it but that the peer should not
+ // send any more data for a bit
+ logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
+ ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
+ } else {
+ // Confirm that we received the data and the peer can now discard it
+ logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
+ ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
+ }
+
+ stopWatch.stop();
+ final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles";
+ final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
+ final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+ final String dataSize = FormatUtils.formatDataSize(bytesReceived);
+ logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] {
+ this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
+
+ return flowFilesReceived.size();
+ }
+
+ @Override
+ public RequestType getRequestType(final Peer peer) throws IOException {
+ if ( !handshakeCompleted ) {
+ throw new IllegalStateException("Handshake has not been completed");
+ }
+ if ( shutdown ) {
+ throw new IllegalStateException("Protocol is shutdown");
+ }
+
+ logger.debug("{} Reading Request Type from {} using {}", new Object[] {this, peer, peer.getCommunicationsSession()});
+ final RequestType requestType = RequestType.readRequestType(new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream()));
+ logger.debug("{} Got Request Type {} from {}", new Object[] {this, requestType, peer});
+
+ return requestType;
+ }
+
+ @Override
+ public VersionNegotiator getVersionNegotiator() {
+ return versionNegotiator;
+ }
+
+ @Override
+ public void shutdown(final Peer peer) {
+ logger.debug("{} Shutting down with {}", this, peer);
+ shutdown = true;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return shutdown;
+ }
+
+ @Override
+ public void sendPeerList(final Peer peer) throws IOException {
+ if ( !handshakeCompleted ) {
+ throw new IllegalStateException("Handshake has not been completed");
+ }
+ if ( shutdown ) {
+ throw new IllegalStateException("Protocol is shutdown");
+ }
+
+ logger.debug("{} Sending Peer List to {}", this, peer);
+ final CommunicationsSession commsSession = peer.getCommunicationsSession();
+ final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
+
+ final NiFiProperties properties = NiFiProperties.getInstance();
+
+ // we have only 1 peer: ourselves.
+ dos.writeInt(1);
+ dos.writeUTF(InetAddress.getLocalHost().getHostName());
+ dos.writeInt(properties.getRemoteInputPort());
+ dos.writeBoolean(properties.isSiteToSiteSecure());
+ dos.writeInt(0); // doesn't matter how many FlowFiles we have, because we're the only host.
+ dos.flush();
+ }
+
+ @Override
+ public String getResourceName() {
+ return RESOURCE_NAME;
+ }
+
+ @Override
+ public void setNodeInformant(final NodeInformant nodeInformant) {
+ }
+
+ @Override
+ public long getRequestExpiration() {
+ return requestExpirationMillis;
+ }
+
+ @Override
+ public String toString() {
+ return "SocketFlowFileServerProtocol[CommsID=" + commsIdentifier + "]";
+ }
+ }