You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/16 21:19:06 UTC
incubator-nifi git commit: NIFI-282: Added send(byte[], Map) method to avoid having to create a DataPacket object
Repository: incubator-nifi
Updated Branches:
refs/heads/nifi-site-to-site-client e16fc7972 -> 2f60ddc03
NIFI-282: Added send(byte[], Map<String, String>) method to avoid having to create a DataPacket object
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/2f60ddc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/2f60ddc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/2f60ddc0
Branch: refs/heads/nifi-site-to-site-client
Commit: 2f60ddc03a3e867ea3b0826621aa63439a10bee7
Parents: e16fc79
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Feb 16 15:18:57 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Feb 16 15:18:57 2015 -0500
----------------------------------------------------------------------
.../java/org/apache/nifi/remote/Transaction.java | 11 +++++++++++
.../nifi/remote/client/socket/SocketClient.java | 6 ++++++
.../apache/nifi/remote/protocol/DataPacket.java | 18 +++++++++++++++++-
.../protocol/socket/SocketClientTransaction.java | 8 ++++++++
4 files changed, 42 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2f60ddc0/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
index 51bf244..eb7312d 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/Transaction.java
@@ -17,6 +17,7 @@
package org.apache.nifi.remote;
import java.io.IOException;
+import java.util.Map;
import org.apache.nifi.remote.protocol.DataPacket;
@@ -81,6 +82,16 @@ public interface Transaction {
void send(DataPacket dataPacket) throws IOException;
/**
+ * Sends the given byte array as the content of a {@link DataPacket} along with the
+ * provided attributes
+ *
+ * @param content
+ * @param attributes
+ * @throws IOException
+ */
+ void send(byte[] content, Map<String, String> attributes) throws IOException;
+
+ /**
* Retrieves information from the remote NiFi instance, if any is available. If no data is available, will return
* {@code null}. It is important to consume all data from the remote NiFi instance before attempting to
* call {@link #confirm()}. This is because the sender is always responsible for determining when the Transaction
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2f60ddc0/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index c11c2ab..bd9319f 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -17,6 +17,7 @@
package org.apache.nifi.remote.client.socket;
import java.io.IOException;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.remote.Communicant;
@@ -186,6 +187,11 @@ public class SocketClient implements SiteToSiteClient {
}
@Override
+ public void send(final byte[] content, final Map<String, String> attributes) throws IOException {
+ transaction.send(content, attributes);
+ }
+
+ @Override
public DataPacket receive() throws IOException {
return transaction.receive();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2f60ddc0/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
index f4fa4d0..3f0ec4f 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java
@@ -19,11 +19,27 @@ package org.apache.nifi.remote.protocol;
import java.io.InputStream;
import java.util.Map;
+
+/**
+ * Represents a piece of data that is to be sent to or that was received from a NiFi instance.
+ */
public interface DataPacket {
+ /**
+ * The key-value attributes that are to be associated with the data
+ * @return
+ */
Map<String, String> getAttributes();
+ /**
+ * An InputStream from which the content can be read
+ * @return
+ */
InputStream getData();
-
+
+ /**
+ * The length of the InputStream.
+ * @return
+ */
long getSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2f60ddc0/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
index b2fffed..2fbcfc4 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java
@@ -16,11 +16,13 @@
*/
package org.apache.nifi.remote.protocol.socket;
+import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Map;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream;
@@ -36,6 +38,7 @@ import org.apache.nifi.remote.io.CompressionInputStream;
import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.RequestType;
+import org.apache.nifi.remote.util.StandardDataPacket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -172,6 +175,11 @@ public class SocketClientTransaction implements Transaction {
@Override
+ public void send(final byte[] content, final Map<String, String> attributes) throws IOException {
+ send(new StandardDataPacket(attributes, new ByteArrayInputStream(content), content.length));
+ }
+
+ @Override
public void send(final DataPacket dataPacket) throws IOException {
try {
try {