You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/04/24 22:44:18 UTC

incubator-nifi git commit: NIFI-271

Repository: incubator-nifi
Updated Branches:
  refs/heads/develop dca93a507 -> d68f71b12


NIFI-271


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d68f71b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d68f71b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d68f71b1

Branch: refs/heads/develop
Commit: d68f71b126c58112070d81f14e804497f9649e5b
Parents: dca93a5
Author: joewitt <jo...@apache.org>
Authored: Fri Apr 24 16:44:04 2015 -0400
Committer: joewitt <jo...@apache.org>
Committed: Fri Apr 24 16:44:04 2015 -0400

----------------------------------------------------------------------
 nifi/nifi-external/nifi-spark-receiver/pom.xml  |  22 +-
 .../org/apache/nifi/spark/NiFiDataPacket.java   |  23 +-
 .../org/apache/nifi/spark/NiFiReceiver.java     | 236 ++++++++++---------
 3 files changed, 145 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d68f71b1/nifi/nifi-external/nifi-spark-receiver/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-external/nifi-spark-receiver/pom.xml b/nifi/nifi-external/nifi-spark-receiver/pom.xml
index 5c93f6b..a6d9378 100644
--- a/nifi/nifi-external/nifi-spark-receiver/pom.xml
+++ b/nifi/nifi-external/nifi-spark-receiver/pom.xml
@@ -23,15 +23,15 @@
     <groupId>org.apache.nifi</groupId>
     <artifactId>nifi-spark-receiver</artifactId>
 
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.spark</groupId>
-			<artifactId>spark-streaming_2.10</artifactId>
-			<version>1.2.0</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.nifi</groupId>
-			<artifactId>nifi-site-to-site-client</artifactId>
-		</dependency>
-	</dependencies>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_2.10</artifactId>
+            <version>1.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-site-to-site-client</artifactId>
+        </dependency>
+    </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d68f71b1/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
index 2f08dc5..484c2a9 100644
--- a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
+++ b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
@@ -20,21 +20,20 @@ import java.util.Map;
 
 /**
  * <p>
- * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both a FlowFile's
- * content and its attributes so that they can be processed by Spark
+ * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both
+ * a FlowFile's content and its attributes so that they can be processed by
+ * Spark
  * </p>
  */
 public interface NiFiDataPacket {
 
-	/**
-	 * Returns the contents of a NiFi FlowFile
-	 * @return
-	 */
-	byte[] getContent();
+    /**
+     * @return the contents of a NiFi FlowFile
+     */
+    byte[] getContent();
 
-	/**
-	 * Returns a Map of attributes that are associated with the NiFi FlowFile
-	 * @return
-	 */
-	Map<String, String> getAttributes();
+    /**
+     * @return a Map of attributes that are associated with the NiFi FlowFile
+     */
+    Map<String, String> getAttributes();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d68f71b1/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
index 9f31062..8cbf60c 100644
--- a/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
+++ b/nifi/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
@@ -31,64 +31,67 @@ import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.receiver.Receiver;
 
-
 /**
  * <p>
- * The <code>NiFiReceiver</code> is a Reliable Receiver that provides a way to pull data 
- * from Apache NiFi so that it can be processed by Spark Streaming. The NiFi Receiver connects 
- * to NiFi instance provided in the config and requests data from
- * the OutputPort that is named. In NiFi, when an OutputPort is added to the root process group,
- * it acts as a queue of data for remote clients. This receiver is then able to pull that data
- * from NiFi reliably.
+ * The <code>NiFiReceiver</code> is a Reliable Receiver that provides a way to
+ * pull data from Apache NiFi so that it can be processed by Spark Streaming.
+ * The NiFi Receiver connects to NiFi instance provided in the config and
+ * requests data from the OutputPort that is named. In NiFi, when an OutputPort
+ * is added to the root process group, it acts as a queue of data for remote
+ * clients. This receiver is then able to pull that data from NiFi reliably.
  * </p>
- * 
+ *
  * <p>
- * It is important to note that if pulling data from a NiFi cluster, the URL that should be used
- * is that of the NiFi Cluster Manager. The Receiver will automatically handle determining the nodes
- * in that cluster and pull from those nodes as appropriate.
+ * It is important to note that if pulling data from a NiFi cluster, the URL
+ * that should be used is that of the NiFi Cluster Manager. The Receiver will
+ * automatically handle determining the nodes in that cluster and pull from
+ * those nodes as appropriate.
  * </p>
- * 
+ *
  * <p>
- * In order to use the NiFiReceiver, you will need to first build a {@link SiteToSiteClientConfig} to provide 
- * to the constructor. This can be achieved by using the {@link SiteToSiteClient.Builder}.
- * Below is an example snippet of driver code to pull data from NiFi that is running on localhost:8080. This
- * example assumes that NiFi exposes and OutputPort on the root group named "Data For Spark".
- * Additionally, it assumes that the data that it will receive from this OutputPort is text
- * data, as it will map the byte array received from NiFi to a UTF-8 Encoded string.
+ * In order to use the NiFiReceiver, you will need to first build a
+ * {@link SiteToSiteClientConfig} to provide to the constructor. This can be
+ * achieved by using the {@link SiteToSiteClient.Builder}. Below is an example
+ * snippet of driver code to pull data from NiFi that is running on
+ * localhost:8080. This example assumes that NiFi exposes and OutputPort on the
+ * root group named "Data For Spark". Additionally, it assumes that the data
+ * that it will receive from this OutputPort is text data, as it will map the
+ * byte array received from NiFi to a UTF-8 Encoded string.
  * </p>
- * 
+ *
  * <code>
  * <pre>
+ * {@code
  * Pattern SPACE = Pattern.compile(" ");
- * 
+ *
  * // Build a Site-to-site client config
  * SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
  *   .setUrl("http://localhost:8080/nifi")
  *   .setPortName("Data For Spark")
  *   .buildConfig();
- * 
+ *
  * SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example");
  * JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L));
- * 
- * // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from 
+ *
+ * // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from
  * // specified Port
- * JavaReceiverInputDStream<NiFiDataPacket> packetStream = 
+ * JavaReceiverInputDStream<NiFiDataPacket> packetStream =
  *     ssc.receiverStream(new NiFiReceiver(clientConfig, StorageLevel.MEMORY_ONLY()));
- * 
+ *
  * // Map the data from NiFi to text, ignoring the attributes
  * JavaDStream<String> text = packetStream.map(new Function<NiFiDataPacket, String>() {
  *   public String call(final NiFiDataPacket dataPacket) throws Exception {
  *     return new String(dataPacket.getContent(), StandardCharsets.UTF_8);
  *   }
  * });
- * 
+ *
  * // Split the words by spaces
  * JavaDStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {
  *   public Iterable<String> call(final String text) throws Exception {
  *     return Arrays.asList(SPACE.split(text));
  *   }
  * });
- * 	    
+ *
  * // Map each word to the number 1, then aggregate by key
  * JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
  *   new PairFunction<String, String, Integer>() {
@@ -101,98 +104,105 @@ import org.apache.spark.streaming.receiver.Receiver;
  *     }
  *    }
  *  );
- * 
+ *
  * // print the results
  * wordCounts.print();
  * ssc.start();
  * ssc.awaitTermination();
+ * }
  * </pre>
  * </code>
  */
 public class NiFiReceiver extends Receiver<NiFiDataPacket> {
-	private static final long serialVersionUID = 3067274587595578836L;
-	private final SiteToSiteClientConfig clientConfig;
-	
-	public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final StorageLevel storageLevel) {
-		super(storageLevel);
-		this.clientConfig = clientConfig;
-	}
-	
-	@Override
-	public void onStart() {
-		final Thread thread = new Thread(new ReceiveRunnable());
-		thread.setDaemon(true);
-		thread.setName("NiFi Receiver");
-		thread.start();
-	}
-
-	@Override
-	public void onStop() {
-	}
-
-	class ReceiveRunnable implements Runnable {
-		public ReceiveRunnable() {
-		}
-		
-		public void run() {
-			try {
-				final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
-				try {
-					while ( !isStopped() ) {
-						final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
-						DataPacket dataPacket = transaction.receive();
-						if ( dataPacket == null ) {
-							transaction.confirm();
-							transaction.complete();
-							
-							// no data available. Wait a bit and try again
-							try {
-								Thread.sleep(1000L);
-							} catch (InterruptedException e) {}
-							
-							continue;
-						}
-	
-						final List<NiFiDataPacket> dataPackets = new ArrayList<NiFiDataPacket>();
-						do {
-							// Read the data into a byte array and wrap it along with the attributes
-							// into a NiFiDataPacket.
-							final InputStream inStream = dataPacket.getData();
-							final byte[] data = new byte[(int) dataPacket.getSize()];
-							StreamUtils.fillBuffer(inStream, data);
-							
-							final Map<String, String> attributes = dataPacket.getAttributes();
-							final NiFiDataPacket NiFiDataPacket = new NiFiDataPacket() {
-								public byte[] getContent() {
-									return data;
-								}
-
-								public Map<String, String> getAttributes() {
-									return attributes;
-								}
-							};
-							
-							dataPackets.add(NiFiDataPacket);
-							dataPacket = transaction.receive();
-						} while ( dataPacket != null );
-
-						// Confirm transaction to verify the data
-						transaction.confirm();
-						
-						store(dataPackets.iterator());
-						
-						transaction.complete();
-					}
-				} finally {
-					try {
-						client.close();
-					} catch (final IOException ioe) {
-						reportError("Failed to close client", ioe);
-					}
-				}
-			} catch (final IOException ioe) {
-				restart("Failed to receive data from NiFi", ioe);
-			}
-		}
-	}
+
+    private static final long serialVersionUID = 3067274587595578836L;
+    private final SiteToSiteClientConfig clientConfig;
+
+    public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final StorageLevel storageLevel) {
+        super(storageLevel);
+        this.clientConfig = clientConfig;
+    }
+
+    @Override
+    public void onStart() {
+        final Thread thread = new Thread(new ReceiveRunnable());
+        thread.setDaemon(true);
+        thread.setName("NiFi Receiver");
+        thread.start();
+    }
+
+    @Override
+    public void onStop() {
+    }
+
+    class ReceiveRunnable implements Runnable {
+
+        public ReceiveRunnable() {
+        }
+
+        @Override
+        public void run() {
+            try {
+                final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
+                try {
+                    while (!isStopped()) {
+                        final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
+                        DataPacket dataPacket = transaction.receive();
+                        if (dataPacket == null) {
+                            transaction.confirm();
+                            transaction.complete();
+
+                            // no data available. Wait a bit and try again
+                            try {
+                                Thread.sleep(1000L);
+                            } catch (InterruptedException e) {
+                            }
+
+                            continue;
+                        }
+
+                        final List<NiFiDataPacket> dataPackets = new ArrayList<>();
+                        do {
+                            // Read the data into a byte array and wrap it along with the attributes
+                            // into a NiFiDataPacket.
+                            final InputStream inStream = dataPacket.getData();
+                            final byte[] data = new byte[(int) dataPacket.getSize()];
+                            StreamUtils.fillBuffer(inStream, data);
+
+                            final Map<String, String> attributes = dataPacket.getAttributes();
+                            final NiFiDataPacket NiFiDataPacket = new NiFiDataPacket() {
+                                @Override
+                                public byte[] getContent() {
+                                    return data;
+                                }
+
+                                @Override
+                                public Map<String, String> getAttributes() {
+                                    return attributes;
+                                }
+                            };
+
+                            dataPackets.add(NiFiDataPacket);
+                            dataPacket = transaction.receive();
+                        } while (dataPacket != null);
+
+                        // Confirm transaction to verify the data
+                        transaction.confirm();
+
+                        store(dataPackets.iterator());
+
+                        transaction.complete();
+                    }
+                } finally {
+                    try {
+                        client.close();
+                    } catch (final IOException ioe) {
+                        reportError("Failed to close client", ioe);
+                    }
+                }
+            } catch (final IOException ioe) {
+                restart("Failed to receive data from NiFi", ioe);
+            }
+        }
+    }
 }