You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/04 02:33:53 UTC
[1/3] incubator-nifi git commit: NIFI-282: Add SiteToSiteConfig object
Repository: incubator-nifi
Updated Branches:
refs/heads/site-to-site-client 1a08fead3 -> 1d4b18480
NIFI-282: Add SiteToSiteConfig object
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/bbe335dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/bbe335dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/bbe335dc
Branch: refs/heads/site-to-site-client
Commit: bbe335dc6d1b9ffcdf973600aff7712444937a49
Parents: 1a08fea
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Jan 27 21:15:33 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Jan 27 21:15:33 2015 -0500
----------------------------------------------------------------------
.../nifi/remote/client/SiteToSiteClient.java | 50 +++++++++++-
.../remote/client/SiteToSiteClientConfig.java | 84 ++++++++++++++++++++
.../nifi/remote/client/socket/SocketClient.java | 15 ++--
3 files changed, 141 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bbe335dc/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 9a63e5b..f4d6f17 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -250,7 +250,55 @@ public interface SiteToSiteClient extends Closeable {
throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client");
}
- return new SocketClient(this);
+ final SiteToSiteClientConfig config = new SiteToSiteClientConfig() {
+
+ @Override
+ public boolean isUseCompression() {
+ return Builder.this.isUseCompression();
+ }
+
+ @Override
+ public String getUrl() {
+ return Builder.this.getUrl();
+ }
+
+ @Override
+ public long getTimeout(final TimeUnit timeUnit) {
+ return Builder.this.getTimeout(timeUnit);
+ }
+
+ @Override
+ public SSLContext getSslContext() {
+ return Builder.this.getSslContext();
+ }
+
+ @Override
+ public String getPortName() {
+ return Builder.this.getPortName();
+ }
+
+ @Override
+ public String getPortIdentifier() {
+ return Builder.this.getPortIdentifier();
+ }
+
+ @Override
+ public long getPenalizationPeriod(final TimeUnit timeUnit) {
+ return Builder.this.getPenalizationPeriod(timeUnit);
+ }
+
+ @Override
+ public File getPeerPersistenceFile() {
+ return Builder.this.getPeerPersistenceFile();
+ }
+
+ @Override
+ public EventReporter getEventReporter() {
+ return Builder.this.getEventReporter();
+ }
+ };
+
+ return new SocketClient(config);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bbe335dc/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
new file mode 100644
index 0000000..6ba2d3f
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.events.EventReporter;
+
+public interface SiteToSiteClientConfig {
+
+ /**
+ * Returns the configured URL for the remote NiFi instance
+ * @return
+ */
+ String getUrl();
+
+ /**
+ * Returns the communications timeout in nanoseconds
+ * @return
+ */
+ long getTimeout(final TimeUnit timeUnit);
+
+ /**
+ * Returns the amount of time that a particular node will be ignored after a
+ * communications error with that node occurs
+ * @param timeUnit
+ * @return
+ */
+ long getPenalizationPeriod(TimeUnit timeUnit);
+
+ /**
+ * Returns the SSL Context that is configured for this builder
+ * @return
+ */
+ SSLContext getSslContext();
+
+ /**
+ * Returns the EventReporter that is to be used by clients to report events
+ * @return
+ */
+ EventReporter getEventReporter();
+
+ /**
+ * Returns the file that is to be used for persisting the nodes of a remote cluster, if any.
+ * @return
+ */
+ File getPeerPersistenceFile();
+
+ /**
+ * Returns a boolean indicating whether or not compression will be used to transfer data
+ * to and from the remote instance
+ * @return
+ */
+ boolean isUseCompression();
+
+ /**
+ * Returns the name of the port that the client is to communicate with.
+ * @return
+ */
+ String getPortName();
+
+ /**
+ * Returns the identifier of the port that the client is to communicate with.
+ * @return
+ */
+ String getPortIdentifier();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bbe335dc/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index 3314c52..c04a90b 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -23,6 +23,7 @@ import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
@@ -37,14 +38,14 @@ public class SocketClient implements SiteToSiteClient {
private final long penalizationNanos;
private volatile String portIdentifier;
- public SocketClient(final Builder builder) {
- pool = new EndpointConnectionStatePool(builder.getUrl(), (int) builder.getTimeout(TimeUnit.MILLISECONDS),
- builder.getSslContext(), builder.getEventReporter(), builder.getPeerPersistenceFile());
+ public SocketClient(final SiteToSiteClientConfig config) {
+ pool = new EndpointConnectionStatePool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS),
+ config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile());
- this.compress = builder.isUseCompression();
- this.portIdentifier = builder.getPortIdentifier();
- this.portName = builder.getPortName();
- this.penalizationNanos = builder.getPenalizationPeriod(TimeUnit.NANOSECONDS);
+ this.compress = config.isUseCompression();
+ this.portIdentifier = config.getPortIdentifier();
+ this.portName = config.getPortName();
+ this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
}
[2/3] incubator-nifi git commit: NIFI-282: Added ability for client
to request batch size and duration when pulling data from remote NiFi
Posted by ma...@apache.org.
NIFI-282: Added ability for client to request batch size and duration when pulling data from remote NiFi
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/20557d38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/20557d38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/20557d38
Branch: refs/heads/site-to-site-client
Commit: 20557d386c6fb049836be0109212a903ed818f54
Parents: bbe335d
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Feb 2 20:42:34 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 2 20:42:34 2015 -0500
----------------------------------------------------------------------
.../nifi/remote/client/SiteToSiteClient.java | 56 +++++++
.../remote/client/SiteToSiteClientConfig.java | 30 ++++
.../socket/EndpointConnectionStatePool.java | 16 ++
.../protocol/socket/HandshakeProperty.java | 40 ++++-
.../protocol/socket/SocketClientProtocol.java | 30 +++-
.../socket/SocketFlowFileServerProtocol.java | 157 ++++++++++++-------
6 files changed, 270 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index f4d6f17..0a05c58 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -27,6 +27,7 @@ import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.socket.SocketClient;
+import org.apache.nifi.remote.protocol.DataPacket;
/**
* <p>
@@ -113,6 +114,9 @@ public interface SiteToSiteClient extends Closeable {
private boolean useCompression;
private String portName;
private String portIdentifier;
+ private int batchCount;
+ private long batchSize;
+ private long batchNanos;
/**
* Specifies the URL of the remote NiFi instance. If this URL points to the Cluster Manager of
@@ -238,6 +242,43 @@ public interface SiteToSiteClient extends Closeable {
}
/**
+ * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+ * the client has the ability to request a particular batch size/duration. This method specifies
+ * the preferred number of {@link DataPacket}s to include in a Transaction.
+ *
+ * @return
+ */
+ public Builder requestBatchCount(final int count) {
+ this.batchCount = count;
+ return this;
+ }
+
+ /**
+ * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+ * the client has the ability to request a particular batch size/duration. This method specifies
+ * the preferred number of bytes to include in a Transaction.
+ *
+ * @return
+ */
+ public Builder requestBatchSize(final long bytes) {
+ this.batchSize = bytes;
+ return this;
+ }
+
+ /**
+ * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+ * the client has the ability to request a particular batch size/duration. This method specifies
+ * the preferred amount of time that a Transaction should span.
+ *
+ * @return
+ */
+ public Builder requestBatchDuration(final long value, final TimeUnit unit) {
+ this.batchNanos = unit.toNanos(value);
+ return this;
+ }
+
+
+ /**
* Builds a new SiteToSiteClient that can be used to send and receive data with remote instances of NiFi
* @return
*/
@@ -296,6 +337,21 @@ public interface SiteToSiteClient extends Closeable {
public EventReporter getEventReporter() {
return Builder.this.getEventReporter();
}
+
+ @Override
+ public long getPreferredBatchDuration(final TimeUnit timeUnit) {
+ return timeUnit.convert(Builder.this.batchNanos, TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public long getPreferredBatchSize() {
+ return Builder.this.batchSize;
+ }
+
+ @Override
+ public int getPreferredBatchCount() {
+ return Builder.this.batchCount;
+ }
};
return new SocketClient(config);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
index 6ba2d3f..37c48f8 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.protocol.DataPacket;
public interface SiteToSiteClientConfig {
@@ -81,4 +82,33 @@ public interface SiteToSiteClientConfig {
* @return
*/
String getPortIdentifier();
+
+ /**
+ * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+ * the client has the ability to request a particular batch size/duration. This returns the maximum
+ * amount of time that we will request a NiFi instance to send data to us in a Transaction.
+ *
+ * @param timeUnit
+ * @return
+ */
+ long getPreferredBatchDuration(TimeUnit timeUnit);
+
+ /**
+ * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+ * the client has the ability to request a particular batch size/duration. This returns the maximum
+ * number of bytes that we will request a NiFi instance to send data to us in a Transaction.
+ *
+ * @return
+ */
+ long getPreferredBatchSize();
+
+
+ /**
+ * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+ * the client has the ability to request a particular batch size/duration. This returns the maximum
+ * number of {@link DataPacket}s that we will request a NiFi instance to send data to us in a Transaction.
+ *
+ * @return
+ */
+ int getPreferredBatchCount();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
index e0ed61f..df42efe 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
@@ -63,6 +63,7 @@ import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteDestination;
import org.apache.nifi.remote.RemoteResourceInitiator;
import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.cluster.ClusterNodeInformation;
import org.apache.nifi.remote.cluster.NodeInformation;
import org.apache.nifi.remote.codec.FlowFileCodec;
@@ -186,7 +187,14 @@ public class EndpointConnectionStatePool {
}, 5, 5, TimeUnit.SECONDS);
}
+
public EndpointConnectionState getEndpointConnectionState(final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
+ return getEndpointConnectionState(remoteDestination, direction, null);
+ }
+
+
+
+ public EndpointConnectionState getEndpointConnectionState(final RemoteDestination remoteDestination, final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
//
// Attempt to get a connection state that already exists for this URL.
//
@@ -229,6 +237,14 @@ public class EndpointConnectionStatePool {
final String peerUrl = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort();
peer = new Peer(commsSession, peerUrl, clusterUrl.toString());
+
+ // set properties based on config
+ if ( config != null ) {
+ protocol.setTimeout((int) config.getTimeout(TimeUnit.MILLISECONDS));
+ protocol.setPreferredBatchCount(config.getPreferredBatchCount());
+ protocol.setPreferredBatchSize(config.getPreferredBatchSize());
+ protocol.setPreferredBatchDuration(config.getPreferredBatchDuration(TimeUnit.MILLISECONDS));
+ }
// perform handshake
try {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
index c4519cd..41dc276 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
@@ -16,8 +16,46 @@
*/
package org.apache.nifi.remote.protocol.socket;
+
+/**
+ * Enumeration of Properties that can be used for the Site-to-Site Socket Protocol.
+ */
public enum HandshakeProperty {
+ /**
+ * Boolean value indicating whether or not the contents of a FlowFile should be
+ * GZipped when transferred.
+ */
GZIP,
+
+ /**
+ * The unique identifier of the port to communicate with
+ */
PORT_IDENTIFIER,
- REQUEST_EXPIRATION_MILLIS;
+
+ /**
+ * Indicates the number of milliseconds after the request was made that the client
+ * will wait for a response. If no response has been received by the time this value
+ * expires, the server can move on without attempting to service the request because
+ * the client will have already disconnected.
+ */
+ REQUEST_EXPIRATION_MILLIS,
+
+ /**
+ * The preferred number of FlowFiles that the server should send to the client
+ * when pulling data. This property was introduced in version 5 of the protocol.
+ */
+ BATCH_COUNT,
+
+ /**
+ * The preferred number of bytes that the server should send to the client when
+ * pulling data. This property was introduced in version 5 of the protocol.
+ */
+ BATCH_SIZE,
+
+ /**
+ * The preferred amount of time that the server should send data to the client
+ * when pulling data. This property was introduced in version 5 of the protocol.
+ * Value is in milliseconds.
+ */
+ BATCH_DURATION;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 4222edf..6976cd8 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -56,7 +56,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SocketClientProtocol implements ClientProtocol {
- private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1);
+ private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1);
private RemoteDestination destination;
private boolean useCompression = false;
@@ -70,12 +70,28 @@ public class SocketClientProtocol implements ClientProtocol {
private boolean readyForFileTransfer = false;
private String transitUriPrefix = null;
private int timeoutMillis = 30000;
+
+ private int batchCount;
+ private long batchSize;
+ private long batchMillis;
private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
public SocketClientProtocol() {
}
+ public void setPreferredBatchCount(final int count) {
+ this.batchCount = count;
+ }
+
+ public void setPreferredBatchSize(final long bytes) {
+ this.batchSize = bytes;
+ }
+
+ public void setPreferredBatchDuration(final long millis) {
+ this.batchMillis = millis;
+ }
+
public void setDestination(final RemoteDestination destination) {
this.destination = destination;
this.useCompression = destination.isUseCompression();
@@ -106,6 +122,18 @@ public class SocketClientProtocol implements ClientProtocol {
properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis) );
+ if ( versionNegotiator.getVersion() >= 5 ) {
+ if ( batchCount > 0 ) {
+ properties.put(HandshakeProperty.BATCH_COUNT, String.valueOf(batchCount));
+ }
+ if ( batchSize > 0L ) {
+ properties.put(HandshakeProperty.BATCH_SIZE, String.valueOf(batchSize));
+ }
+ if ( batchMillis > 0L ) {
+ properties.put(HandshakeProperty.BATCH_DURATION, String.valueOf(batchMillis));
+ }
+ }
+
final CommunicationsSession commsSession = peer.getCommunicationsSession();
commsSession.setTimeout(timeoutMillis);
final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index 12c234e..eb22b0e 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -78,10 +78,14 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
private FlowFileCodec negotiatedFlowFileCodec = null;
private String transitUriPrefix = null;
- private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1);
+ private int requestedBatchCount = 0;
+ private long requestedBatchBytes = 0L;
+ private long requestedBatchNanos = 0L;
+ private static final long DEFAULT_BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L);
+
+ private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1);
private final Logger logger = LoggerFactory.getLogger(SocketFlowFileServerProtocol.class);
- private static final long BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
@Override
@@ -137,68 +141,90 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
throw new HandshakeException("Received unknown property: " + propertyName);
}
- switch (property) {
- case GZIP: {
- useGzip = Boolean.parseBoolean(value);
- break;
- }
- case REQUEST_EXPIRATION_MILLIS:
- requestExpirationMillis = Long.parseLong(value);
- break;
- case PORT_IDENTIFIER: {
- Port receivedPort = rootGroup.getInputPort(value);
- if ( receivedPort == null ) {
- receivedPort = rootGroup.getOutputPort(value);
- }
- if ( receivedPort == null ) {
- logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value);
- ResponseCode.UNKNOWN_PORT.writeResponse(dos);
- throw new HandshakeException("Received unknown port identifier: " + value);
- }
- if ( !(receivedPort instanceof RootGroupPort) ) {
- logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value);
- ResponseCode.UNKNOWN_PORT.writeResponse(dos);
- throw new HandshakeException("Received port identifier " + value + ", but this Port is not a RootGroupPort");
- }
-
- this.port = (RootGroupPort) receivedPort;
- final PortAuthorizationResult portAuthResult = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn());
- if ( !portAuthResult.isAuthorized() ) {
- logger.debug("Responding with ResponseCode UNAUTHORIZED: ", portAuthResult.getExplanation());
- ResponseCode.UNAUTHORIZED.writeResponse(dos, portAuthResult.getExplanation());
- responseWritten = true;
+ try {
+ switch (property) {
+ case GZIP: {
+ useGzip = Boolean.parseBoolean(value);
break;
}
-
- if ( !receivedPort.isValid() ) {
- logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort);
- ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid");
- responseWritten = true;
+ case REQUEST_EXPIRATION_MILLIS:
+ requestExpirationMillis = Long.parseLong(value);
break;
- }
-
- if ( !receivedPort.isRunning() ) {
- logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort);
- ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running");
- responseWritten = true;
+ case BATCH_COUNT:
+ requestedBatchCount = Integer.parseInt(value);
+ if ( requestedBatchCount < 0 ) {
+ throw new HandshakeException("Cannot request Batch Count less than 1; requested value: " + value);
+ }
break;
- }
-
- // PORTS_DESTINATION_FULL was introduced in version 2. If version 1, just ignore this
- // we we will simply not service the request but the sender will timeout
- if ( getVersionNegotiator().getVersion() > 1 ) {
- for ( final Connection connection : port.getConnections() ) {
- if ( connection.getFlowFileQueue().isFull() ) {
- logger.debug("Responding with ResponseCode PORTS_DESTINATION_FULL for {}", receivedPort);
- ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos);
- responseWritten = true;
- break;
+ case BATCH_SIZE:
+ requestedBatchBytes = Long.parseLong(value);
+ if ( requestedBatchBytes < 0 ) {
+ throw new HandshakeException("Cannot request Batch Size less than 1; requested value: " + value);
+ }
+ break;
+ case BATCH_DURATION:
+ requestedBatchNanos = TimeUnit.MILLISECONDS.toNanos(Long.parseLong(value));
+ if ( requestedBatchNanos < 0 ) {
+ throw new HandshakeException("Cannot request Batch Duration less than 1; requested value: " + value);
+ }
+ break;
+ case PORT_IDENTIFIER: {
+ Port receivedPort = rootGroup.getInputPort(value);
+ if ( receivedPort == null ) {
+ receivedPort = rootGroup.getOutputPort(value);
+ }
+ if ( receivedPort == null ) {
+ logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value);
+ ResponseCode.UNKNOWN_PORT.writeResponse(dos);
+ throw new HandshakeException("Received unknown port identifier: " + value);
+ }
+ if ( !(receivedPort instanceof RootGroupPort) ) {
+ logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value);
+ ResponseCode.UNKNOWN_PORT.writeResponse(dos);
+ throw new HandshakeException("Received port identifier " + value + ", but this Port is not a RootGroupPort");
+ }
+
+ this.port = (RootGroupPort) receivedPort;
+ final PortAuthorizationResult portAuthResult = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn());
+ if ( !portAuthResult.isAuthorized() ) {
+ logger.debug("Responding with ResponseCode UNAUTHORIZED: ", portAuthResult.getExplanation());
+ ResponseCode.UNAUTHORIZED.writeResponse(dos, portAuthResult.getExplanation());
+ responseWritten = true;
+ break;
+ }
+
+ if ( !receivedPort.isValid() ) {
+ logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort);
+ ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid");
+ responseWritten = true;
+ break;
+ }
+
+ if ( !receivedPort.isRunning() ) {
+ logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort);
+ ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running");
+ responseWritten = true;
+ break;
+ }
+
+ // PORTS_DESTINATION_FULL was introduced in version 2. If version 1, just ignore this
+ // we we will simply not service the request but the sender will timeout
+ if ( getVersionNegotiator().getVersion() > 1 ) {
+ for ( final Connection connection : port.getConnections() ) {
+ if ( connection.getFlowFileQueue().isFull() ) {
+ logger.debug("Responding with ResponseCode PORTS_DESTINATION_FULL for {}", receivedPort);
+ ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos);
+ responseWritten = true;
+ break;
+ }
}
}
+
+ break;
}
-
- break;
}
+ } catch (final NumberFormatException nfe) {
+ throw new HandshakeException("Received invalid value for property '" + property + "'; invalid value: " + value);
}
}
@@ -333,8 +359,25 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false);
session.remove(flowFile);
+ // determine if we should check for more data on queue.
final long sendingNanos = System.nanoTime() - startNanos;
- if ( sendingNanos < BATCH_NANOS ) {
+ boolean poll = true;
+ if ( sendingNanos >= requestedBatchNanos && requestedBatchNanos > 0L ) {
+ poll = false;
+ }
+ if ( bytesSent >= requestedBatchBytes && requestedBatchBytes > 0L ) {
+ poll = false;
+ }
+ if ( flowFilesSent.size() >= requestedBatchCount && requestedBatchCount > 0 ) {
+ poll = false;
+ }
+
+ if ( requestedBatchNanos == 0 && requestedBatchBytes == 0 && requestedBatchCount == 0 ) {
+ poll = (sendingNanos < DEFAULT_BATCH_NANOS);
+ }
+
+ if ( poll ) {
+ // we've not elapsed the requested sending duration, so get more data.
flowFile = session.get();
} else {
flowFile = null;
[3/3] incubator-nifi git commit: NIFI-282: Updated documentation
Posted by ma...@apache.org.
NIFI-282: Updated documentation
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/1d4b1848
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/1d4b1848
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/1d4b1848
Branch: refs/heads/site-to-site-client
Commit: 1d4b1848087049788232ef433fb5a505cf05bab3
Parents: 20557d3
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Feb 3 20:33:44 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Feb 3 20:33:44 2015 -0500
----------------------------------------------------------------------
.../nifi/remote/client/SiteToSiteClient.java | 8 +++
.../nifi/remote/client/socket/SocketClient.java | 6 ++
.../nifi/remote/SocketRemoteSiteListener.java | 3 +
.../nifi/remote/StandardRemoteGroupPort.java | 72 +++++++++++++++++++-
.../apache/nifi/remote/RemoteDestination.java | 20 ++++++
5 files changed, 108 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 0a05c58..fa94b81 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -98,6 +98,14 @@ public interface SiteToSiteClient extends Closeable {
/**
* <p>
+ * Returns the configuration object that was built by the Builder
+ * </p>
+ * @return
+ */
+ SiteToSiteClientConfig getConfig();
+
+ /**
+ * <p>
* The Builder is the mechanism by which all configuration is passed to the SiteToSiteClient.
* Once constructed, the SiteToSiteClient cannot be reconfigured (i.e., it is immutable). If
* a change in configuration should be desired, the client should be {@link Closeable#close() closed}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index c04a90b..0494d04 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -32,6 +32,7 @@ import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.util.ObjectHolder;
public class SocketClient implements SiteToSiteClient {
+ private final SiteToSiteClientConfig config;
private final EndpointConnectionStatePool pool;
private final boolean compress;
private final String portName;
@@ -42,12 +43,17 @@ public class SocketClient implements SiteToSiteClient {
pool = new EndpointConnectionStatePool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS),
config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile());
+ this.config = config;
this.compress = config.isUseCompression();
this.portIdentifier = config.getPortIdentifier();
this.portName = config.getPortName();
this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
}
+ @Override
+ public SiteToSiteClientConfig getConfig() {
+ return config;
+ }
@Override
public boolean isSecure() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index f053e65..3295956 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -122,6 +122,9 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
}
LOG.trace("Got connection");
+ if ( stopped.get() ) {
+ return;
+ }
final Socket socket = acceptedSocket;
final SocketChannel socketChannel = socket.getChannel();
final Thread thread = new Thread(new Runnable() {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index a51cdba..3fc2f5a 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.remote;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -32,11 +33,13 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.events.EventReporter;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.client.socket.EndpointConnectionState;
import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
import org.apache.nifi.remote.codec.FlowFileCodec;
@@ -142,7 +145,74 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
final EndpointConnectionState connectionState;
try {
- connectionState = connectionStatePool.getEndpointConnectionState(this, transferDirection);
+ // TODO: TESTING ONLY!! REMOVE!!
+ final SiteToSiteClientConfig config = new SiteToSiteClientConfig() {
+ @Override
+ public boolean isUseCompression() {
+ return false;
+ }
+
+ @Override
+ public String getUrl() {
+ return null;
+ }
+
+ @Override
+ public long getTimeout(TimeUnit timeUnit) {
+ return timeUnit.convert(1, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public SSLContext getSslContext() {
+ return null;
+ }
+
+ @Override
+ public long getPreferredBatchSize() {
+ return 1024 * 1024L;
+ }
+
+ @Override
+ public long getPreferredBatchDuration(TimeUnit timeUnit) {
+ return timeUnit.convert(1, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public int getPreferredBatchCount() {
+ return 1;
+ }
+
+ @Override
+ public String getPortName() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public String getPortIdentifier() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public long getPenalizationPeriod(TimeUnit timeUnit) {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public File getPeerPersistenceFile() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public EventReporter getEventReporter() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ };
+ connectionState = connectionStatePool.getEndpointConnectionState(this, transferDirection, config);
} catch (final PortNotRunningException e) {
context.yield();
this.targetRunning.set(false);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
index 8c972f7..f718581 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
@@ -18,10 +18,30 @@ package org.apache.nifi.remote;
import java.util.concurrent.TimeUnit;
+
+/**
+ * A model object for referring to a remote destination (i.e., a Port) for site-to-site communications
+ */
public interface RemoteDestination {
+ /**
+ * Returns the identifier of the remote destination
+ *
+ * @return
+ */
String getIdentifier();
+ /**
+ * Returns the amount of time that system should pause sending to a particular node if unable to
+ * send data to or receive data from this endpoint
+ * @param timeUnit
+ * @return
+ */
long getYieldPeriod(TimeUnit timeUnit);
+ /**
+ * Returns whether or not compression should be used when transferring data to or receiving
+ * data from the remote endpoint
+ * @return
+ */
boolean isUseCompression();
}