You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/04 02:33:53 UTC

[1/3] incubator-nifi git commit: NIFI-282: Add SiteToSiteConfig object

Repository: incubator-nifi
Updated Branches:
  refs/heads/site-to-site-client 1a08fead3 -> 1d4b18480


NIFI-282: Add SiteToSiteConfig object


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/bbe335dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/bbe335dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/bbe335dc

Branch: refs/heads/site-to-site-client
Commit: bbe335dc6d1b9ffcdf973600aff7712444937a49
Parents: 1a08fea
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Jan 27 21:15:33 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Jan 27 21:15:33 2015 -0500

----------------------------------------------------------------------
 .../nifi/remote/client/SiteToSiteClient.java    | 50 +++++++++++-
 .../remote/client/SiteToSiteClientConfig.java   | 84 ++++++++++++++++++++
 .../nifi/remote/client/socket/SocketClient.java | 15 ++--
 3 files changed, 141 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bbe335dc/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 9a63e5b..f4d6f17 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -250,7 +250,55 @@ public interface SiteToSiteClient extends Closeable {
 				throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client");
 			}
 			
-			return new SocketClient(this);
+			final SiteToSiteClientConfig config = new SiteToSiteClientConfig() {
+				
+				@Override
+				public boolean isUseCompression() {
+					return Builder.this.isUseCompression();
+				}
+				
+				@Override
+				public String getUrl() {
+					return Builder.this.getUrl();
+				}
+				
+				@Override
+				public long getTimeout(final TimeUnit timeUnit) {
+					return Builder.this.getTimeout(timeUnit);
+				}
+				
+				@Override
+				public SSLContext getSslContext() {
+					return Builder.this.getSslContext();
+				}
+				
+				@Override
+				public String getPortName() {
+					return Builder.this.getPortName();
+				}
+				
+				@Override
+				public String getPortIdentifier() {
+					return Builder.this.getPortIdentifier();
+				}
+				
+				@Override
+				public long getPenalizationPeriod(final TimeUnit timeUnit) {
+					return Builder.this.getPenalizationPeriod(timeUnit);
+				}
+				
+				@Override
+				public File getPeerPersistenceFile() {
+					return Builder.this.getPeerPersistenceFile();
+				}
+				
+				@Override
+				public EventReporter getEventReporter() {
+					return Builder.this.getEventReporter();
+				}
+			};
+			
+			return new SocketClient(config);
 		}
 
 		/**

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bbe335dc/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
new file mode 100644
index 0000000..6ba2d3f
--- /dev/null
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.nifi.events.EventReporter;
+
+public interface SiteToSiteClientConfig {
+
+	/**
+	 * Returns the configured URL for the remote NiFi instance
+	 * @return
+	 */
+	String getUrl();
+
+	/**
+	 * Returns the communications timeout in nanoseconds
+	 * @return
+	 */
+	long getTimeout(final TimeUnit timeUnit);
+
+	/**
+	 * Returns the amount of time that a particular node will be ignored after a
+	 * communications error with that node occurs
+	 * @param timeUnit
+	 * @return
+	 */
+	long getPenalizationPeriod(TimeUnit timeUnit);
+
+	/**
+	 * Returns the SSL Context that is configured for this builder
+	 * @return
+	 */
+	SSLContext getSslContext();
+	
+	/**
+	 * Returns the EventReporter that is to be used by clients to report events
+	 * @return
+	 */
+	EventReporter getEventReporter();
+
+	/**
+	 * Returns the file that is to be used for persisting the nodes of a remote cluster, if any.
+	 * @return
+	 */
+	File getPeerPersistenceFile();
+
+	/**
+	 * Returns a boolean indicating whether or not compression will be used to transfer data
+	 * to and from the remote instance
+	 * @return
+	 */
+	boolean isUseCompression();
+
+	/**
+	 * Returns the name of the port that the client is to communicate with.
+	 * @return
+	 */
+	String getPortName();
+
+	/**
+	 * Returns the identifier of the port that the client is to communicate with.
+	 * @return
+	 */
+	String getPortIdentifier();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bbe335dc/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index 3314c52..c04a90b 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -23,6 +23,7 @@ import org.apache.nifi.remote.RemoteDestination;
 import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.SiteToSiteClient;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
 import org.apache.nifi.remote.exception.HandshakeException;
 import org.apache.nifi.remote.exception.PortNotRunningException;
 import org.apache.nifi.remote.exception.ProtocolException;
@@ -37,14 +38,14 @@ public class SocketClient implements SiteToSiteClient {
 	private final long penalizationNanos;
 	private volatile String portIdentifier;
 	
-	public SocketClient(final Builder builder) {
-		pool = new EndpointConnectionStatePool(builder.getUrl(), (int) builder.getTimeout(TimeUnit.MILLISECONDS), 
-				builder.getSslContext(), builder.getEventReporter(), builder.getPeerPersistenceFile());
+	public SocketClient(final SiteToSiteClientConfig config) {
+		pool = new EndpointConnectionStatePool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS), 
+				config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile());
 		
-		this.compress = builder.isUseCompression();
-		this.portIdentifier = builder.getPortIdentifier();
-		this.portName = builder.getPortName();
-		this.penalizationNanos = builder.getPenalizationPeriod(TimeUnit.NANOSECONDS);
+		this.compress = config.isUseCompression();
+		this.portIdentifier = config.getPortIdentifier();
+		this.portName = config.getPortName();
+		this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
 	}
 	
 	


[2/3] incubator-nifi git commit: NIFI-282: Added ability for client to request batch size and duration when pulling data from remote NiFi

Posted by ma...@apache.org.
NIFI-282: Added ability for client to request batch size and duration when pulling data from remote NiFi


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/20557d38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/20557d38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/20557d38

Branch: refs/heads/site-to-site-client
Commit: 20557d386c6fb049836be0109212a903ed818f54
Parents: bbe335d
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Feb 2 20:42:34 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 2 20:42:34 2015 -0500

----------------------------------------------------------------------
 .../nifi/remote/client/SiteToSiteClient.java    |  56 +++++++
 .../remote/client/SiteToSiteClientConfig.java   |  30 ++++
 .../socket/EndpointConnectionStatePool.java     |  16 ++
 .../protocol/socket/HandshakeProperty.java      |  40 ++++-
 .../protocol/socket/SocketClientProtocol.java   |  30 +++-
 .../socket/SocketFlowFileServerProtocol.java    | 157 ++++++++++++-------
 6 files changed, 270 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index f4d6f17..0a05c58 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -27,6 +27,7 @@ import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.remote.Transaction;
 import org.apache.nifi.remote.TransferDirection;
 import org.apache.nifi.remote.client.socket.SocketClient;
+import org.apache.nifi.remote.protocol.DataPacket;
 
 /**
  * <p>
@@ -113,6 +114,9 @@ public interface SiteToSiteClient extends Closeable {
 		private boolean useCompression;
 		private String portName;
 		private String portIdentifier;
+		private int batchCount;
+		private long batchSize;
+		private long batchNanos;
 
 		/**
 		 * Specifies the URL of the remote NiFi instance. If this URL points to the Cluster Manager of
@@ -238,6 +242,43 @@ public interface SiteToSiteClient extends Closeable {
 		}
 		
 		/**
+	     * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+	     * the client has the ability to request a particular batch size/duration. This method specifies
+	     * the preferred number of {@link DataPacket}s to include in a Transaction.
+	     * 
+	     * @return
+	     */
+		public Builder requestBatchCount(final int count) {
+		    this.batchCount = count;
+		    return this;
+		}
+
+		/**
+	     * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+	     * the client has the ability to request a particular batch size/duration. This method specifies
+	     * the preferred number of bytes to include in a Transaction.
+	     * 
+	     * @return
+	     */
+		public Builder requestBatchSize(final long bytes) {
+		    this.batchSize = bytes;
+		    return this;
+		}
+		
+        /**
+         * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+         * the client has the ability to request a particular batch size/duration. This method specifies
+         * the preferred amount of time that a Transaction should span.
+         * 
+         * @return
+         */
+		public Builder requestBatchDuration(final long value, final TimeUnit unit) {
+		    this.batchNanos = unit.toNanos(value);
+		    return this;
+		}
+		
+		
+		/**
 		 * Builds a new SiteToSiteClient that can be used to send and receive data with remote instances of NiFi
 		 * @return
 		 */
@@ -296,6 +337,21 @@ public interface SiteToSiteClient extends Closeable {
 				public EventReporter getEventReporter() {
 					return Builder.this.getEventReporter();
 				}
+
+		        @Override
+		        public long getPreferredBatchDuration(final TimeUnit timeUnit) {
+		            return timeUnit.convert(Builder.this.batchNanos, TimeUnit.NANOSECONDS);
+		        }
+		        
+		        @Override
+		        public long getPreferredBatchSize() {
+		            return Builder.this.batchSize;
+		        }
+		        
+		        @Override
+		        public int getPreferredBatchCount() {
+		            return Builder.this.batchCount;
+		        }
 			};
 			
 			return new SocketClient(config);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
index 6ba2d3f..37c48f8 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
 import javax.net.ssl.SSLContext;
 
 import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.remote.protocol.DataPacket;
 
 public interface SiteToSiteClientConfig {
 
@@ -81,4 +82,33 @@ public interface SiteToSiteClientConfig {
 	 * @return
 	 */
 	String getPortIdentifier();
+	
+	/**
+	 * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+	 * the client has the ability to request a particular batch size/duration. This returns the maximum
+	 * amount of time that we will request a NiFi instance to send data to us in a Transaction.
+	 * 
+	 * @param timeUnit
+	 * @return
+	 */
+	long getPreferredBatchDuration(TimeUnit timeUnit);
+	
+    /**
+     * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+     * the client has the ability to request a particular batch size/duration. This returns the maximum
+     * number of bytes that we will request a NiFi instance to send data to us in a Transaction.
+     * 
+     * @return
+     */
+	long getPreferredBatchSize();
+	
+	
+	/**
+     * When pulling data from a NiFi instance, the sender chooses how large a Transaction is. However,
+     * the client has the ability to request a particular batch size/duration. This returns the maximum
+     * number of {@link DataPacket}s that we will request a NiFi instance to send data to us in a Transaction.
+     * 
+     * @return
+     */
+	int getPreferredBatchCount();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
index e0ed61f..df42efe 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java
@@ -63,6 +63,7 @@ import org.apache.nifi.remote.PeerStatus;
 import org.apache.nifi.remote.RemoteDestination;
 import org.apache.nifi.remote.RemoteResourceInitiator;
 import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
 import org.apache.nifi.remote.cluster.ClusterNodeInformation;
 import org.apache.nifi.remote.cluster.NodeInformation;
 import org.apache.nifi.remote.codec.FlowFileCodec;
@@ -186,7 +187,14 @@ public class EndpointConnectionStatePool {
     	}, 5, 5, TimeUnit.SECONDS);
     }
     
+    
     public EndpointConnectionState getEndpointConnectionState(final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
+        return getEndpointConnectionState(remoteDestination, direction, null);
+    }
+    
+    
+    
+    public EndpointConnectionState getEndpointConnectionState(final RemoteDestination remoteDestination, final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
     	//
         // Attempt to get a connection state that already exists for this URL.
         //
@@ -229,6 +237,14 @@ public class EndpointConnectionStatePool {
                 
                 final String peerUrl = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort();
                 peer = new Peer(commsSession, peerUrl, clusterUrl.toString());
+
+                // set properties based on config
+                if ( config != null ) {
+                    protocol.setTimeout((int) config.getTimeout(TimeUnit.MILLISECONDS));
+                    protocol.setPreferredBatchCount(config.getPreferredBatchCount());
+                    protocol.setPreferredBatchSize(config.getPreferredBatchSize());
+                    protocol.setPreferredBatchDuration(config.getPreferredBatchDuration(TimeUnit.MILLISECONDS));
+                }
                 
                 // perform handshake
                 try {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
index c4519cd..41dc276 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java
@@ -16,8 +16,46 @@
  */
 package org.apache.nifi.remote.protocol.socket;
 
+
+/**
+ * Enumeration of Properties that can be used for the Site-to-Site Socket Protocol.
+ */
 public enum HandshakeProperty {
+    /**
+     * Boolean value indicating whether or not the contents of a FlowFile should be
+     * GZipped when transferred.
+     */
     GZIP,
+    
+    /**
+     * The unique identifier of the port to communicate with
+     */
     PORT_IDENTIFIER,
-    REQUEST_EXPIRATION_MILLIS;
+    
+    /**
+     * Indicates the number of milliseconds after the request was made that the client
+     * will wait for a response. If no response has been received by the time this value
+     * expires, the server can move on without attempting to service the request because
+     * the client will have already disconnected.
+     */
+    REQUEST_EXPIRATION_MILLIS,
+    
+    /**
+     * The preferred number of FlowFiles that the server should send to the client
+     * when pulling data. This property was introduced in version 5 of the protocol.
+     */
+    BATCH_COUNT,
+    
+    /**
+     * The preferred number of bytes that the server should send to the client when
+     * pulling data. This property was introduced in version 5 of the protocol.
+     */
+    BATCH_SIZE,
+    
+    /**
+     * The preferred amount of time that the server should send data to the client
+     * when pulling data. This property was introduced in version 5 of the protocol.
+     * Value is in milliseconds.
+     */
+    BATCH_DURATION;
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
index 4222edf..6976cd8 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java
@@ -56,7 +56,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SocketClientProtocol implements ClientProtocol {
-    private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1);
+    private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1);
 
     private RemoteDestination destination;
     private boolean useCompression = false;
@@ -70,12 +70,28 @@ public class SocketClientProtocol implements ClientProtocol {
     private boolean readyForFileTransfer = false;
     private String transitUriPrefix = null;
     private int timeoutMillis = 30000;
+    
+    private int batchCount;
+    private long batchSize;
+    private long batchMillis;
 
     private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
     
     public SocketClientProtocol() {
     }
 
+    public void setPreferredBatchCount(final int count) {
+        this.batchCount = count;
+    }
+    
+    public void setPreferredBatchSize(final long bytes) {
+        this.batchSize = bytes;
+    }
+    
+    public void setPreferredBatchDuration(final long millis) {
+        this.batchMillis = millis;
+    }
+    
     public void setDestination(final RemoteDestination destination) {
         this.destination = destination;
         this.useCompression = destination.isUseCompression();
@@ -106,6 +122,18 @@ public class SocketClientProtocol implements ClientProtocol {
         
         properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis) );
         
+        if ( versionNegotiator.getVersion() >= 5 ) {
+            if ( batchCount > 0 ) {
+                properties.put(HandshakeProperty.BATCH_COUNT, String.valueOf(batchCount));
+            }
+            if ( batchSize > 0L ) {
+                properties.put(HandshakeProperty.BATCH_SIZE, String.valueOf(batchSize));
+            }
+            if ( batchMillis > 0L ) {
+                properties.put(HandshakeProperty.BATCH_DURATION, String.valueOf(batchMillis));
+            }
+        }
+        
         final CommunicationsSession commsSession = peer.getCommunicationsSession();
         commsSession.setTimeout(timeoutMillis);
         final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/20557d38/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
index 12c234e..eb22b0e 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java
@@ -78,10 +78,14 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
     private FlowFileCodec negotiatedFlowFileCodec = null;
     private String transitUriPrefix = null;
     
-    private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1);
+    private int requestedBatchCount = 0;
+    private long requestedBatchBytes = 0L;
+    private long requestedBatchNanos = 0L;
+    private static final long DEFAULT_BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L);
+    
+    private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1);
     private final Logger logger = LoggerFactory.getLogger(SocketFlowFileServerProtocol.class);
     
-    private static final long BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds
 
     
     @Override
@@ -137,68 +141,90 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
                 throw new HandshakeException("Received unknown property: " + propertyName);
             }
             
-            switch (property) {
-                case GZIP: {
-                    useGzip = Boolean.parseBoolean(value);
-                    break;
-                }
-                case REQUEST_EXPIRATION_MILLIS:
-                    requestExpirationMillis = Long.parseLong(value);
-                    break;
-                case PORT_IDENTIFIER: {
-                    Port receivedPort = rootGroup.getInputPort(value);
-                    if ( receivedPort == null ) {
-                        receivedPort = rootGroup.getOutputPort(value);
-                    }
-                    if ( receivedPort == null ) {
-                        logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value);
-                        ResponseCode.UNKNOWN_PORT.writeResponse(dos);
-                        throw new HandshakeException("Received unknown port identifier: " + value);
-                    }
-                    if ( !(receivedPort instanceof RootGroupPort) ) {
-                        logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value);
-                        ResponseCode.UNKNOWN_PORT.writeResponse(dos);
-                        throw new HandshakeException("Received port identifier " + value + ", but this Port is not a RootGroupPort");
-                    }
-                    
-                    this.port = (RootGroupPort) receivedPort;
-                    final PortAuthorizationResult portAuthResult = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn());
-                    if ( !portAuthResult.isAuthorized() ) {
-                        logger.debug("Responding with ResponseCode UNAUTHORIZED: ", portAuthResult.getExplanation());
-                        ResponseCode.UNAUTHORIZED.writeResponse(dos, portAuthResult.getExplanation());
-                        responseWritten = true;
+            try {
+                switch (property) {
+                    case GZIP: {
+                        useGzip = Boolean.parseBoolean(value);
                         break;
                     }
-                    
-                    if ( !receivedPort.isValid() ) {
-                        logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort);
-                        ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid");
-                        responseWritten = true;
+                    case REQUEST_EXPIRATION_MILLIS:
+                        requestExpirationMillis = Long.parseLong(value);
                         break;
-                    }
-                    
-                    if ( !receivedPort.isRunning() ) {
-                        logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort);
-                        ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running");
-                        responseWritten = true;
+                    case BATCH_COUNT:
+                        requestedBatchCount = Integer.parseInt(value);
+                        if ( requestedBatchCount < 0 ) {
+                            throw new HandshakeException("Cannot request Batch Count less than 1; requested value: " + value);
+                        }
                         break;
-                    }
-                    
-                    // PORTS_DESTINATION_FULL was introduced in version 2. If version 1, just ignore this
-                    // we we will simply not service the request but the sender will timeout
-                    if ( getVersionNegotiator().getVersion() > 1 ) {
-                        for ( final Connection connection : port.getConnections() ) {
-                            if ( connection.getFlowFileQueue().isFull() ) {
-                                logger.debug("Responding with ResponseCode PORTS_DESTINATION_FULL for {}", receivedPort);
-                                ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos);
-                                responseWritten = true;
-                                break;
+                    case BATCH_SIZE:
+                        requestedBatchBytes = Long.parseLong(value);
+                        if ( requestedBatchBytes < 0 ) {
+                            throw new HandshakeException("Cannot request Batch Size less than 1; requested value: " + value);
+                        }
+                        break;
+                    case BATCH_DURATION:
+                        requestedBatchNanos = TimeUnit.MILLISECONDS.toNanos(Long.parseLong(value));
+                        if ( requestedBatchNanos < 0 ) {
+                            throw new HandshakeException("Cannot request Batch Duration less than 1; requested value: " + value);
+                        }
+                        break;
+                    case PORT_IDENTIFIER: {
+                        Port receivedPort = rootGroup.getInputPort(value);
+                        if ( receivedPort == null ) {
+                            receivedPort = rootGroup.getOutputPort(value);
+                        }
+                        if ( receivedPort == null ) {
+                            logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value);
+                            ResponseCode.UNKNOWN_PORT.writeResponse(dos);
+                            throw new HandshakeException("Received unknown port identifier: " + value);
+                        }
+                        if ( !(receivedPort instanceof RootGroupPort) ) {
+                            logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value);
+                            ResponseCode.UNKNOWN_PORT.writeResponse(dos);
+                            throw new HandshakeException("Received port identifier " + value + ", but this Port is not a RootGroupPort");
+                        }
+                        
+                        this.port = (RootGroupPort) receivedPort;
+                        final PortAuthorizationResult portAuthResult = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn());
+                        if ( !portAuthResult.isAuthorized() ) {
+                            logger.debug("Responding with ResponseCode UNAUTHORIZED: ", portAuthResult.getExplanation());
+                            ResponseCode.UNAUTHORIZED.writeResponse(dos, portAuthResult.getExplanation());
+                            responseWritten = true;
+                            break;
+                        }
+                        
+                        if ( !receivedPort.isValid() ) {
+                            logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort);
+                            ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid");
+                            responseWritten = true;
+                            break;
+                        }
+                        
+                        if ( !receivedPort.isRunning() ) {
+                            logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort);
+                            ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running");
+                            responseWritten = true;
+                            break;
+                        }
+                        
+                        // PORTS_DESTINATION_FULL was introduced in version 2. If version 1, just ignore this
+                        // we we will simply not service the request but the sender will timeout
+                        if ( getVersionNegotiator().getVersion() > 1 ) {
+                            for ( final Connection connection : port.getConnections() ) {
+                                if ( connection.getFlowFileQueue().isFull() ) {
+                                    logger.debug("Responding with ResponseCode PORTS_DESTINATION_FULL for {}", receivedPort);
+                                    ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos);
+                                    responseWritten = true;
+                                    break;
+                                }
                             }
                         }
+                        
+                        break;
                     }
-                    
-                    break;
                 }
+            } catch (final NumberFormatException nfe) {
+                throw new HandshakeException("Received invalid value for property '" + property + "'; invalid value: " + value);
             }
         }
         
@@ -333,8 +359,25 @@ public class SocketFlowFileServerProtocol implements ServerProtocol {
             session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false);
             session.remove(flowFile);
             
+            // determine if we should check for more data on queue.
             final long sendingNanos = System.nanoTime() - startNanos;
-            if ( sendingNanos < BATCH_NANOS ) { 
+            boolean poll = true;
+            if ( sendingNanos >= requestedBatchNanos && requestedBatchNanos > 0L ) {
+                poll = false;
+            }
+            if ( bytesSent >= requestedBatchBytes && requestedBatchBytes > 0L ) {
+                poll = false;
+            }
+            if ( flowFilesSent.size() >= requestedBatchCount && requestedBatchCount > 0 ) {
+                poll = false;
+            }
+            
+            if ( requestedBatchNanos == 0 && requestedBatchBytes == 0 && requestedBatchCount == 0 ) {
+                poll = (sendingNanos < DEFAULT_BATCH_NANOS);
+            }
+            
+            if ( poll ) { 
+                // we've not elapsed the requested sending duration, so get more data.
                 flowFile = session.get();
             } else {
                 flowFile = null;


[3/3] incubator-nifi git commit: NIFI-282: Updated documentation

Posted by ma...@apache.org.
NIFI-282: Updated documentation


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/1d4b1848
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/1d4b1848
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/1d4b1848

Branch: refs/heads/site-to-site-client
Commit: 1d4b1848087049788232ef433fb5a505cf05bab3
Parents: 20557d3
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Feb 3 20:33:44 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Feb 3 20:33:44 2015 -0500

----------------------------------------------------------------------
 .../nifi/remote/client/SiteToSiteClient.java    |  8 +++
 .../nifi/remote/client/socket/SocketClient.java |  6 ++
 .../nifi/remote/SocketRemoteSiteListener.java   |  3 +
 .../nifi/remote/StandardRemoteGroupPort.java    | 72 +++++++++++++++++++-
 .../apache/nifi/remote/RemoteDestination.java   | 20 ++++++
 5 files changed, 108 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 0a05c58..fa94b81 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -98,6 +98,14 @@ public interface SiteToSiteClient extends Closeable {
 	
 	/**
 	 * <p>
+	 * Returns the configuration object that was built by the Builder
+	 * </p>
+	 * @return
+	 */
+	SiteToSiteClientConfig getConfig();
+	
+	/**
+	 * <p>
 	 * The Builder is the mechanism by which all configuration is passed to the SiteToSiteClient.
 	 * Once constructed, the SiteToSiteClient cannot be reconfigured (i.e., it is immutable). If
 	 * a change in configuration should be desired, the client should be {@link Closeable#close() closed}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index c04a90b..0494d04 100644
--- a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -32,6 +32,7 @@ import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.util.ObjectHolder;
 
 public class SocketClient implements SiteToSiteClient {
+    private final SiteToSiteClientConfig config;
 	private final EndpointConnectionStatePool pool;
 	private final boolean compress;
 	private final String portName;
@@ -42,12 +43,17 @@ public class SocketClient implements SiteToSiteClient {
 		pool = new EndpointConnectionStatePool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS), 
 				config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile());
 		
+		this.config = config;
 		this.compress = config.isUseCompression();
 		this.portIdentifier = config.getPortIdentifier();
 		this.portName = config.getPortName();
 		this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
 	}
 	
+	@Override
+	public SiteToSiteClientConfig getConfig() {
+	    return config;
+	}
 	
 	@Override
 	public boolean isSecure() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
index f053e65..3295956 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/SocketRemoteSiteListener.java
@@ -122,6 +122,9 @@ public class SocketRemoteSiteListener implements RemoteSiteListener {
                     }
                     LOG.trace("Got connection");
                     
+                    if ( stopped.get() ) {
+                        return;
+                    }
                     final Socket socket = acceptedSocket;
                     final SocketChannel socketChannel = socket.getChannel();
                     final Thread thread = new Thread(new Runnable() {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
index a51cdba..3fc2f5a 100644
--- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
+++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/StandardRemoteGroupPort.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.remote;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -32,11 +33,13 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.remote.client.SiteToSiteClientConfig;
 import org.apache.nifi.remote.client.socket.EndpointConnectionState;
 import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool;
 import org.apache.nifi.remote.codec.FlowFileCodec;
@@ -142,7 +145,74 @@ public class StandardRemoteGroupPort extends RemoteGroupPort {
         
         final EndpointConnectionState connectionState;
         try {
-        	connectionState = connectionStatePool.getEndpointConnectionState(this, transferDirection);
+            // TODO: TESTING ONLY!! REMOVE!!
+            final SiteToSiteClientConfig config = new SiteToSiteClientConfig() {
+                @Override
+                public boolean isUseCompression() {
+                    return false;
+                }
+                
+                @Override
+                public String getUrl() {
+                    return null;
+                }
+                
+                @Override
+                public long getTimeout(TimeUnit timeUnit) {
+                    return timeUnit.convert(1, TimeUnit.SECONDS);
+                }
+                
+                @Override
+                public SSLContext getSslContext() {
+                    return null;
+                }
+                
+                @Override
+                public long getPreferredBatchSize() {
+                    return 1024 * 1024L;
+                }
+                
+                @Override
+                public long getPreferredBatchDuration(TimeUnit timeUnit) {
+                    return timeUnit.convert(1, TimeUnit.SECONDS);
+                }
+                
+                @Override
+                public int getPreferredBatchCount() {
+                    return 1;
+                }
+                
+                @Override
+                public String getPortName() {
+                    // TODO Auto-generated method stub
+                    return null;
+                }
+                
+                @Override
+                public String getPortIdentifier() {
+                    // TODO Auto-generated method stub
+                    return null;
+                }
+                
+                @Override
+                public long getPenalizationPeriod(TimeUnit timeUnit) {
+                    // TODO Auto-generated method stub
+                    return 0;
+                }
+                
+                @Override
+                public File getPeerPersistenceFile() {
+                    // TODO Auto-generated method stub
+                    return null;
+                }
+                
+                @Override
+                public EventReporter getEventReporter() {
+                    // TODO Auto-generated method stub
+                    return null;
+                }
+            };
+        	connectionState = connectionStatePool.getEndpointConnectionState(this, transferDirection, config);
         } catch (final PortNotRunningException e) {
             context.yield();
             this.targetRunning.set(false);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1d4b1848/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
index 8c972f7..f718581 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/remote/RemoteDestination.java
@@ -18,10 +18,30 @@ package org.apache.nifi.remote;
 
 import java.util.concurrent.TimeUnit;
 
+
+/**
+ * A model object for referring to a remote destination (i.e., a Port) for site-to-site communications
+ */
 public interface RemoteDestination {
+    /**
+     * Returns the identifier of the remote destination
+     * 
+     * @return
+     */
 	String getIdentifier();
 	
+	/**
+	 * Returns the amount of time that system should pause sending to a particular node if unable to 
+	 * send data to or receive data from this endpoint
+	 * @param timeUnit
+	 * @return
+	 */
 	long getYieldPeriod(TimeUnit timeUnit);
 	
+	/**
+	 * Returns whether or not compression should be used when transferring data to or receiving
+	 * data from the remote endpoint
+	 * @return
+	 */
 	boolean isUseCompression();
 }