You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/05/07 06:55:13 UTC
git commit: [HOTFIX] SPARK-1637: There are some Streaming examples
added after the PR #571 was last updated.
Repository: spark
Updated Branches:
refs/heads/master 48ba3b8cd -> fdae095de
[HOTFIX] SPARK-1637: There are some Streaming examples added after the PR #571 was last updated.
This resulted in Compilation Errors.
cc @mateiz project not compiling currently.
Author: Sandeep <sa...@techaddict.me>
Closes #673 from techaddict/SPARK-1637-HOTFIX and squashes the following commits:
b512f4f [Sandeep] [SPARK-1637][HOTFIX] There are some Streaming examples added after the PR #571 was last updated. This resulted in Compilation Errors.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fdae095d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fdae095d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fdae095d
Branch: refs/heads/master
Commit: fdae095de2daa1fc3b343c05e515235756d856a4
Parents: 48ba3b8
Author: Sandeep <sa...@techaddict.me>
Authored: Tue May 6 21:55:05 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue May 6 21:55:05 2014 -0700
----------------------------------------------------------------------
.../examples/streaming/JavaCustomReceiver.java | 151 ++++++++++++++++++
.../streaming/examples/JavaCustomReceiver.java | 153 -------------------
.../examples/streaming/CustomReceiver.scala | 108 +++++++++++++
.../streaming/examples/CustomReceiver.scala | 108 -------------
4 files changed, 259 insertions(+), 261 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/fdae095d/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
new file mode 100644
index 0000000..7f558f3
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import com.google.common.collect.Lists;
+
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.receiver.Receiver;
+import scala.Tuple2;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.ConnectException;
+import java.net.Socket;
+import java.util.regex.Pattern;
+
+/**
+ * Custom Receiver that receives data over a socket. Received bytes is interpreted as
+ * text and \n delimited lines are considered as records. They are then counted and printed.
+ *
+ * Usage: JavaCustomReceiver <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> of the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ ./run org.apache.spark.examples.streaming.JavaCustomReceiver local[2] localhost 9999`
+ */
+
+public class JavaCustomReceiver extends Receiver<String> {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ public static void main(String[] args) {
+ if (args.length < 3) {
+ System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" +
+ "In local mode, <master> should be 'local[n]' with n > 1");
+ System.exit(1);
+ }
+
+ StreamingExamples.setStreamingLogLevels();
+
+ // Create the context with a 1 second batch size
+ JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount",
+ new Duration(1000), System.getenv("SPARK_HOME"),
+ JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
+
+ // Create a input stream with the custom receiver on target ip:port and count the
+ // words in input stream of \n delimited text (eg. generated by 'nc')
+ JavaReceiverInputDStream<String> lines = ssc.receiverStream(
+ new JavaCustomReceiver(args[1], Integer.parseInt(args[2])));
+ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Lists.newArrayList(SPACE.split(x));
+ }
+ });
+ JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+ new PairFunction<String, String, Integer>() {
+ @Override public Tuple2<String, Integer> call(String s) {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) {
+ return i1 + i2;
+ }
+ });
+
+ wordCounts.print();
+ ssc.start();
+ ssc.awaitTermination();
+ }
+
+ // ============= Receiver code that receives data over a socket ==============
+
+ String host = null;
+ int port = -1;
+
+ public JavaCustomReceiver(String host_ , int port_) {
+ super(StorageLevel.MEMORY_AND_DISK_2());
+ host = host_;
+ port = port_;
+ }
+
+ public void onStart() {
+ // Start the thread that receives data over a connection
+ new Thread() {
+ @Override public void run() {
+ receive();
+ }
+ }.start();
+ }
+
+ public void onStop() {
+ // There is nothing much to do as the thread calling receive()
+ // is designed to stop by itself isStopped() returns false
+ }
+
+ /** Create a socket connection and receive data until receiver is stopped */
+ private void receive() {
+ Socket socket = null;
+ String userInput = null;
+
+ try {
+ // connect to the server
+ socket = new Socket(host, port);
+
+ BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+
+ // Until stopped or connection broken continue reading
+ while (!isStopped() && (userInput = reader.readLine()) != null) {
+ System.out.println("Received data '" + userInput + "'");
+ store(userInput);
+ }
+ reader.close();
+ socket.close();
+
+ // Restart in an attempt to connect again when server is active again
+ restart("Trying to connect again");
+ } catch(ConnectException ce) {
+ // restart if could not connect to server
+ restart("Could not connect", ce);
+ } catch(Throwable t) {
+ restart("Error receiving data", t);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/fdae095d/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
deleted file mode 100644
index e36c780..0000000
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.examples;
-
-import com.google.common.collect.Lists;
-
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.receiver.Receiver;
-import scala.Tuple2;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.ConnectException;
-import java.net.Socket;
-import java.util.regex.Pattern;
-
-/**
- * Custom Receiver that receives data over a socket. Received bytes is interpreted as
- * text and \n delimited lines are considered as records. They are then counted and printed.
- *
- * Usage: JavaCustomReceiver <master> <hostname> <port>
- * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
- * <hostname> and <port> of the TCP server that Spark Streaming would connect to receive data.
- *
- * To run this on your local machine, you need to first run a Netcat server
- * `$ nc -lk 9999`
- * and then run the example
- * `$ ./run org.apache.spark.streaming.examples.JavaCustomReceiver local[2] localhost 9999`
- */
-
-public class JavaCustomReceiver extends Receiver<String> {
- private static final Pattern SPACE = Pattern.compile(" ");
-
- public static void main(String[] args) {
- if (args.length < 3) {
- System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" +
- "In local mode, <master> should be 'local[n]' with n > 1");
- System.exit(1);
- }
-
- StreamingExamples.setStreamingLogLevels();
-
- // Create the context with a 1 second batch size
- JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount",
- new Duration(1000), System.getenv("SPARK_HOME"),
- JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
-
- // Create a input stream with the custom receiver on target ip:port and count the
- // words in input stream of \n delimited text (eg. generated by 'nc')
- JavaReceiverInputDStream<String> lines = ssc.receiverStream(
- new JavaCustomReceiver(args[1], Integer.parseInt(args[2])));
- JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
- }
- });
- JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override public Tuple2<String, Integer> call(String s) {
- return new Tuple2<String, Integer>(s, 1);
- }
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
-
- wordCounts.print();
- ssc.start();
- ssc.awaitTermination();
- }
-
- // ============= Receiver code that receives data over a socket ==============
-
- String host = null;
- int port = -1;
-
- public JavaCustomReceiver(String host_ , int port_) {
- super(StorageLevel.MEMORY_AND_DISK_2());
- host = host_;
- port = port_;
- }
-
- public void onStart() {
- // Start the thread that receives data over a connection
- new Thread() {
- @Override public void run() {
- receive();
- }
- }.start();
- }
-
- public void onStop() {
- // There is nothing much to do as the thread calling receive()
- // is designed to stop by itself isStopped() returns false
- }
-
- /** Create a socket connection and receive data until receiver is stopped */
- private void receive() {
- Socket socket = null;
- String userInput = null;
-
- try {
- // connect to the server
- socket = new Socket(host, port);
-
- BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-
- // Until stopped or connection broken continue reading
- while (!isStopped() && (userInput = reader.readLine()) != null) {
- System.out.println("Received data '" + userInput + "'");
- store(userInput);
- }
- reader.close();
- socket.close();
-
- // Restart in an attempt to connect again when server is active again
- restart("Trying to connect again");
- } catch(ConnectException ce) {
- // restart if could not connect to server
- restart("Could not connect", ce);
- } catch(Throwable t) {
- restart("Error receiving data", t);
- }
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/spark/blob/fdae095d/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
new file mode 100644
index 0000000..e317e2d
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.io.{InputStreamReader, BufferedReader, InputStream}
+import java.net.Socket
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.receiver.Receiver
+
+/**
+ * Custom Receiver that receives data over a socket. Received bytes is interpreted as
+ * text and \n delimited lines are considered as records. They are then counted and printed.
+ *
+ * Usage: CustomReceiver <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> of the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ ./run org.apache.spark.examples.streaming.CustomReceiver local[2] localhost 9999`
+ */
+object CustomReceiver {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+
+ StreamingExamples.setStreamingLogLevels()
+
+ // Create the context with a 1 second batch size
+ val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+
+ // Create a input stream with the custom receiver on target ip:port and count the
+ // words in input stream of \n delimited text (eg. generated by 'nc')
+ val lines = ssc.receiverStream(new CustomReceiver(args(1), args(2).toInt))
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.print()
+ ssc.start()
+ ssc.awaitTermination()
+ }
+}
+
+
+class CustomReceiver(host: String, port: Int)
+ extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
+
+ def onStart() {
+ // Start the thread that receives data over a connection
+ new Thread("Socket Receiver") {
+ override def run() { receive() }
+ }.start()
+ }
+
+ def onStop() {
+ // There is nothing much to do as the thread calling receive()
+ // is designed to stop by itself isStopped() returns false
+ }
+
+ /** Create a socket connection and receive data until receiver is stopped */
+ private def receive() {
+ var socket: Socket = null
+ var userInput: String = null
+ try {
+ logInfo("Connecting to " + host + ":" + port)
+ socket = new Socket(host, port)
+ logInfo("Connected to " + host + ":" + port)
+ val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
+ userInput = reader.readLine()
+ while(!isStopped && userInput != null) {
+ store(userInput)
+ userInput = reader.readLine()
+ }
+ reader.close()
+ socket.close()
+ logInfo("Stopped receiving")
+ restart("Trying to connect again")
+ } catch {
+ case e: java.net.ConnectException =>
+ restart("Error connecting to " + host + ":" + port, e)
+ case t: Throwable =>
+ restart("Error receiving data", t)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/fdae095d/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala
deleted file mode 100644
index eebffd8..0000000
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.examples
-
-import java.io.{InputStreamReader, BufferedReader, InputStream}
-import java.net.Socket
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.receiver.Receiver
-
-/**
- * Custom Receiver that receives data over a socket. Received bytes is interpreted as
- * text and \n delimited lines are considered as records. They are then counted and printed.
- *
- * Usage: CustomReceiver <master> <hostname> <port>
- * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
- * <hostname> and <port> of the TCP server that Spark Streaming would connect to receive data.
- *
- * To run this on your local machine, you need to first run a Netcat server
- * `$ nc -lk 9999`
- * and then run the example
- * `$ ./run org.apache.spark.streaming.examples.CustomReceiver local[2] localhost 9999`
- */
-object CustomReceiver {
- def main(args: Array[String]) {
- if (args.length < 3) {
- System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
- "In local mode, <master> should be 'local[n]' with n > 1")
- System.exit(1)
- }
-
- StreamingExamples.setStreamingLogLevels()
-
- // Create the context with a 1 second batch size
- val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
- System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
-
- // Create a input stream with the custom receiver on target ip:port and count the
- // words in input stream of \n delimited text (eg. generated by 'nc')
- val lines = ssc.receiverStream(new CustomReceiver(args(1), args(2).toInt))
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.print()
- ssc.start()
- ssc.awaitTermination()
- }
-}
-
-
-class CustomReceiver(host: String, port: Int)
- extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
-
- def onStart() {
- // Start the thread that receives data over a connection
- new Thread("Socket Receiver") {
- override def run() { receive() }
- }.start()
- }
-
- def onStop() {
- // There is nothing much to do as the thread calling receive()
- // is designed to stop by itself isStopped() returns false
- }
-
- /** Create a socket connection and receive data until receiver is stopped */
- private def receive() {
- var socket: Socket = null
- var userInput: String = null
- try {
- logInfo("Connecting to " + host + ":" + port)
- socket = new Socket(host, port)
- logInfo("Connected to " + host + ":" + port)
- val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
- userInput = reader.readLine()
- while(!isStopped && userInput != null) {
- store(userInput)
- userInput = reader.readLine()
- }
- reader.close()
- socket.close()
- logInfo("Stopped receiving")
- restart("Trying to connect again")
- } catch {
- case e: java.net.ConnectException =>
- restart("Error connecting to " + host + ":" + port, e)
- case t: Throwable =>
- restart("Error receiving data", t)
- }
- }
-}