You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/02/26 04:54:24 UTC

[21/51] [abbrv] incubator-nifi git commit: NIFI-282: Added send(byte[], Map) method to avoid having to create a DataPacket object

NIFI-282: Added send(byte[], Map<String, String>) method to avoid having to create a DataPacket object


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/2f60ddc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/2f60ddc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/2f60ddc0

Branch: refs/heads/NIFI-353
Commit: 2f60ddc03a3e867ea3b0826621aa63439a10bee7
Parents: e16fc79
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Feb 16 15:18:57 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 16 15:18:57 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/nifi/remote/Transaction.java  | 11 +++++++++++
 .../nifi/remote/client/socket/SocketClient.java   |  6 ++++++
 .../apache/nifi/remote/protocol/DataPacket.java   | 18 +++++++++++++++++-
 .../protocol/socket/SocketClientTransaction.java  |  8 ++++++++
 4 files changed, 42 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2f60ddc0/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
index 51bf244..eb7312d 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.remote;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.nifi.remote.protocol.DataPacket;
 
@@ -81,6 +82,16 @@ public interface Transaction {
     void send(DataPacket dataPacket) throws IOException;
     
     /**
+     * Sends the given byte array as the content of a {@link DataPacket} along with the
+     * provided attributes
+     * 
+     * @param content
+     * @param attributes
+     * @throws IOException
+     */
+    void send(byte[] content, Map<String, String> attributes) throws IOException;
+    
+    /**
      * Retrieves information from the remote NiFi instance, if any is available. If no data is available, will return
      * {@code null}. It is important to consume all data from the remote NiFi instance before attempting to 
      * call {@link #confirm()}. This is because the sender is always responsible for determining when the Transaction

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2f60ddc0/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index c11c2ab..bd9319f 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.remote.client.socket;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.remote.Communicant;
@@ -186,6 +187,11 @@ public class SocketClient implements SiteToSiteClient {
 			}
 
 			@Override
+			public void send(final byte[] content, final Map<String, String> attributes) throws IOException {
+			    transaction.send(content, attributes);
+			}
+			
+			@Override
 			public DataPacket receive() throws IOException {
 				return transaction.receive();
 			}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2f60ddc0/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
index f4fa4d0..3f0ec4f 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
@@ -19,11 +19,27 @@ package org.apache.nifi.remote.protocol;
 import java.io.InputStream;
 import java.util.Map;
 
+
+/**
+ * Represents a piece of data that is to be sent to or that was received from a NiFi instance.
+ */
 public interface DataPacket {
 
+    /**
+     * The key-value attributes that are to be associated with the data
+     * @return
+     */
 	Map<String, String> getAttributes();
 	
+	/**
+	 * An InputStream from which the content can be read
+	 * @return
+	 */
 	InputStream getData();
-	
+
+	/**
+	 * The length of the InputStream.
+	 * @return
+	 */
 	long getSize();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2f60ddc0/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index b2fffed..2fbcfc4 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -16,11 +16,13 @@
  */
 package org.apache.nifi.remote.protocol.socket;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Map;
 import java.util.zip.CRC32;
 import java.util.zip.CheckedInputStream;
 import java.util.zip.CheckedOutputStream;
@@ -36,6 +38,7 @@ import org.apache.nifi.remote.io.CompressionInputStream;
 import org.apache.nifi.remote.io.CompressionOutputStream;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.remote.protocol.RequestType;
+import org.apache.nifi.remote.util.StandardDataPacket;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -172,6 +175,11 @@ public class SocketClientTransaction implements Transaction {
 	
 	
 	@Override
+	public void send(final byte[] content, final Map<String, String> attributes) throws IOException {
+	    send(new StandardDataPacket(attributes, new ByteArrayInputStream(content), content.length));
+	}
+	
+	@Override
 	public void send(final DataPacket dataPacket) throws IOException {
 	    try {
 	        try {