You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/04/24 22:44:18 UTC
incubator-nifi git commit: NIFI-271
Repository: incubator-nifi
Updated Branches:
refs/heads/develop dca93a507 -> d68f71b12
NIFI-271
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d68f71b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d68f71b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d68f71b1
Branch: refs/heads/develop
Commit: d68f71b126c58112070d81f14e804497f9649e5b
Parents: dca93a5
Author: joewitt <jo...@apache.org>
Authored: Fri Apr 24 16:44:04 2015 -0400
Committer: joewitt <jo...@apache.org>
Committed: Fri Apr 24 16:44:04 2015 -0400
----------------------------------------------------------------------
nifi/nifi-external/nifi-spark-receiver/pom.xml | 22 +-
.../org/apache/nifi/spark/NiFiDataPacket.java | 23 +-
.../org/apache/nifi/spark/NiFiReceiver.java | 236 ++++++++++---------
3 files changed, 145 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d68f71b1/nifi/nifi-external/nifi-spark-receiver/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-external/nifi-spark-receiver/pom.xml b/nifi/nifi-external/nifi-spark-receiver/pom.xml
index 5c93f6b..a6d9378 100644
--- a/nifi/nifi-external/nifi-spark-receiver/pom.xml
+++ b/nifi/nifi-external/nifi-spark-receiver/pom.xml
@@ -23,15 +23,15 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-spark-receiver</artifactId>
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_2.10</artifactId>
- <version>1.2.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-site-to-site-client</artifactId>
- </dependency>
- </dependencies>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_2.10</artifactId>
+ <version>1.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-site-to-site-client</artifactId>
+ </dependency>
+ </dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d68f71b1/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
index 2f08dc5..484c2a9 100644
--- a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
+++ b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
@@ -20,21 +20,20 @@ import java.util.Map;
/**
* <p>
- * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both a FlowFile's
- * content and its attributes so that they can be processed by Spark
+ * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both
+ * a FlowFile's content and its attributes so that they can be processed by
+ * Spark
* </p>
*/
public interface NiFiDataPacket {
- /**
- * Returns the contents of a NiFi FlowFile
- * @return
- */
- byte[] getContent();
+ /**
+ * @return the contents of a NiFi FlowFile
+ */
+ byte[] getContent();
- /**
- * Returns a Map of attributes that are associated with the NiFi FlowFile
- * @return
- */
- Map<String, String> getAttributes();
+ /**
+ * @return a Map of attributes that are associated with the NiFi FlowFile
+ */
+ Map<String, String> getAttributes();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d68f71b1/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
index 9f31062..8cbf60c 100644
--- a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
+++ b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
@@ -31,64 +31,67 @@ import org.apache.nifi.stream.io.StreamUtils;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
-
/**
* <p>
- * The <code>NiFiReceiver</code> is a Reliable Receiver that provides a way to pull data
- * from Apache NiFi so that it can be processed by Spark Streaming. The NiFi Receiver connects
- * to NiFi instance provided in the config and requests data from
- * the OutputPort that is named. In NiFi, when an OutputPort is added to the root process group,
- * it acts as a queue of data for remote clients. This receiver is then able to pull that data
- * from NiFi reliably.
+ * The <code>NiFiReceiver</code> is a Reliable Receiver that provides a way to
+ * pull data from Apache NiFi so that it can be processed by Spark Streaming.
+ * The NiFi Receiver connects to NiFi instance provided in the config and
+ * requests data from the OutputPort that is named. In NiFi, when an OutputPort
+ * is added to the root process group, it acts as a queue of data for remote
+ * clients. This receiver is then able to pull that data from NiFi reliably.
* </p>
- *
+ *
* <p>
- * It is important to note that if pulling data from a NiFi cluster, the URL that should be used
- * is that of the NiFi Cluster Manager. The Receiver will automatically handle determining the nodes
- * in that cluster and pull from those nodes as appropriate.
+ * It is important to note that if pulling data from a NiFi cluster, the URL
+ * that should be used is that of the NiFi Cluster Manager. The Receiver will
+ * automatically handle determining the nodes in that cluster and pull from
+ * those nodes as appropriate.
* </p>
- *
+ *
* <p>
- * In order to use the NiFiReceiver, you will need to first build a {@link SiteToSiteClientConfig} to provide
- * to the constructor. This can be achieved by using the {@link SiteToSiteClient.Builder}.
- * Below is an example snippet of driver code to pull data from NiFi that is running on localhost:8080. This
- * example assumes that NiFi exposes and OutputPort on the root group named "Data For Spark".
- * Additionally, it assumes that the data that it will receive from this OutputPort is text
- * data, as it will map the byte array received from NiFi to a UTF-8 Encoded string.
+ * In order to use the NiFiReceiver, you will need to first build a
+ * {@link SiteToSiteClientConfig} to provide to the constructor. This can be
+ * achieved by using the {@link SiteToSiteClient.Builder}. Below is an example
+ * snippet of driver code to pull data from NiFi that is running on
+ * localhost:8080. This example assumes that NiFi exposes and OutputPort on the
+ * root group named "Data For Spark". Additionally, it assumes that the data
+ * that it will receive from this OutputPort is text data, as it will map the
+ * byte array received from NiFi to a UTF-8 Encoded string.
* </p>
- *
+ *
* <code>
* <pre>
+ * {@code
* Pattern SPACE = Pattern.compile(" ");
- *
+ *
* // Build a Site-to-site client config
* SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
* .setUrl("http://localhost:8080/nifi")
* .setPortName("Data For Spark")
* .buildConfig();
- *
+ *
* SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example");
* JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L));
- *
- * // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from
+ *
+ * // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from
* // specified Port
- * JavaReceiverInputDStream<NiFiDataPacket> packetStream =
+ * JavaReceiverInputDStream<NiFiDataPacket> packetStream =
* ssc.receiverStream(new NiFiReceiver(clientConfig, StorageLevel.MEMORY_ONLY()));
- *
+ *
* // Map the data from NiFi to text, ignoring the attributes
* JavaDStream<String> text = packetStream.map(new Function<NiFiDataPacket, String>() {
* public String call(final NiFiDataPacket dataPacket) throws Exception {
* return new String(dataPacket.getContent(), StandardCharsets.UTF_8);
* }
* });
- *
+ *
* // Split the words by spaces
* JavaDStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {
* public Iterable<String> call(final String text) throws Exception {
* return Arrays.asList(SPACE.split(text));
* }
* });
- *
+ *
* // Map each word to the number 1, then aggregate by key
* JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
* new PairFunction<String, String, Integer>() {
@@ -101,98 +104,105 @@ import org.apache.spark.streaming.receiver.Receiver;
* }
* }
* );
- *
+ *
* // print the results
* wordCounts.print();
* ssc.start();
* ssc.awaitTermination();
+ * }
* </pre>
* </code>
*/
public class NiFiReceiver extends Receiver<NiFiDataPacket> {
- private static final long serialVersionUID = 3067274587595578836L;
- private final SiteToSiteClientConfig clientConfig;
-
- public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final StorageLevel storageLevel) {
- super(storageLevel);
- this.clientConfig = clientConfig;
- }
-
- @Override
- public void onStart() {
- final Thread thread = new Thread(new ReceiveRunnable());
- thread.setDaemon(true);
- thread.setName("NiFi Receiver");
- thread.start();
- }
-
- @Override
- public void onStop() {
- }
-
- class ReceiveRunnable implements Runnable {
- public ReceiveRunnable() {
- }
-
- public void run() {
- try {
- final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
- try {
- while ( !isStopped() ) {
- final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
- DataPacket dataPacket = transaction.receive();
- if ( dataPacket == null ) {
- transaction.confirm();
- transaction.complete();
-
- // no data available. Wait a bit and try again
- try {
- Thread.sleep(1000L);
- } catch (InterruptedException e) {}
-
- continue;
- }
-
- final List<NiFiDataPacket> dataPackets = new ArrayList<NiFiDataPacket>();
- do {
- // Read the data into a byte array and wrap it along with the attributes
- // into a NiFiDataPacket.
- final InputStream inStream = dataPacket.getData();
- final byte[] data = new byte[(int) dataPacket.getSize()];
- StreamUtils.fillBuffer(inStream, data);
-
- final Map<String, String> attributes = dataPacket.getAttributes();
- final NiFiDataPacket NiFiDataPacket = new NiFiDataPacket() {
- public byte[] getContent() {
- return data;
- }
-
- public Map<String, String> getAttributes() {
- return attributes;
- }
- };
-
- dataPackets.add(NiFiDataPacket);
- dataPacket = transaction.receive();
- } while ( dataPacket != null );
-
- // Confirm transaction to verify the data
- transaction.confirm();
-
- store(dataPackets.iterator());
-
- transaction.complete();
- }
- } finally {
- try {
- client.close();
- } catch (final IOException ioe) {
- reportError("Failed to close client", ioe);
- }
- }
- } catch (final IOException ioe) {
- restart("Failed to receive data from NiFi", ioe);
- }
- }
- }
+
+ private static final long serialVersionUID = 3067274587595578836L;
+ private final SiteToSiteClientConfig clientConfig;
+
+ public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final StorageLevel storageLevel) {
+ super(storageLevel);
+ this.clientConfig = clientConfig;
+ }
+
+ @Override
+ public void onStart() {
+ final Thread thread = new Thread(new ReceiveRunnable());
+ thread.setDaemon(true);
+ thread.setName("NiFi Receiver");
+ thread.start();
+ }
+
+ @Override
+ public void onStop() {
+ }
+
+ class ReceiveRunnable implements Runnable {
+
+ public ReceiveRunnable() {
+ }
+
+ @Override
+ public void run() {
+ try {
+ final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
+ try {
+ while (!isStopped()) {
+ final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
+ DataPacket dataPacket = transaction.receive();
+ if (dataPacket == null) {
+ transaction.confirm();
+ transaction.complete();
+
+ // no data available. Wait a bit and try again
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {
+ }
+
+ continue;
+ }
+
+ final List<NiFiDataPacket> dataPackets = new ArrayList<>();
+ do {
+ // Read the data into a byte array and wrap it along with the attributes
+ // into a NiFiDataPacket.
+ final InputStream inStream = dataPacket.getData();
+ final byte[] data = new byte[(int) dataPacket.getSize()];
+ StreamUtils.fillBuffer(inStream, data);
+
+ final Map<String, String> attributes = dataPacket.getAttributes();
+ final NiFiDataPacket NiFiDataPacket = new NiFiDataPacket() {
+ @Override
+ public byte[] getContent() {
+ return data;
+ }
+
+ @Override
+ public Map<String, String> getAttributes() {
+ return attributes;
+ }
+ };
+
+ dataPackets.add(NiFiDataPacket);
+ dataPacket = transaction.receive();
+ } while (dataPacket != null);
+
+ // Confirm transaction to verify the data
+ transaction.confirm();
+
+ store(dataPackets.iterator());
+
+ transaction.complete();
+ }
+ } finally {
+ try {
+ client.close();
+ } catch (final IOException ioe) {
+ reportError("Failed to close client", ioe);
+ }
+ }
+ } catch (final IOException ioe) {
+ restart("Failed to receive data from NiFi", ioe);
+ }
+ }
+ }
}