You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/05/07 06:55:13 UTC

git commit: [HOTFIX] SPARK-1637: There are some Streaming examples added after the PR #571 was last updated.

Repository: spark
Updated Branches:
  refs/heads/master 48ba3b8cd -> fdae095de


[HOTFIX] SPARK-1637: There are some Streaming examples added after the PR #571 was last updated.

This resulted in Compilation Errors.
cc @mateiz project not compiling currently.

Author: Sandeep <sa...@techaddict.me>

Closes #673 from techaddict/SPARK-1637-HOTFIX and squashes the following commits:

b512f4f [Sandeep] [SPARK-1637][HOTFIX] There are some Streaming examples added after the PR #571 was last updated. This resulted in Compilation Errors.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fdae095d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fdae095d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fdae095d

Branch: refs/heads/master
Commit: fdae095de2daa1fc3b343c05e515235756d856a4
Parents: 48ba3b8
Author: Sandeep <sa...@techaddict.me>
Authored: Tue May 6 21:55:05 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue May 6 21:55:05 2014 -0700

----------------------------------------------------------------------
 .../examples/streaming/JavaCustomReceiver.java  | 151 ++++++++++++++++++
 .../streaming/examples/JavaCustomReceiver.java  | 153 -------------------
 .../examples/streaming/CustomReceiver.scala     | 108 +++++++++++++
 .../streaming/examples/CustomReceiver.scala     | 108 -------------
 4 files changed, 259 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fdae095d/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
new file mode 100644
index 0000000..7f558f3
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import com.google.common.collect.Lists;
+
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.receiver.Receiver;
+import scala.Tuple2;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.ConnectException;
+import java.net.Socket;
+import java.util.regex.Pattern;
+
+/**
+ * Custom Receiver that receives data over a socket. Received bytes is interpreted as
+ * text and \n delimited lines are considered as records. They are then counted and printed.
+ *
+ * Usage: JavaCustomReceiver <master> <hostname> <port>
+ *   <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ *   <hostname> and <port> of the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *    `$ nc -lk 9999`
+ * and then run the example
+ *    `$ ./run org.apache.spark.examples.streaming.JavaCustomReceiver local[2] localhost 9999`
+ */
+
+public class JavaCustomReceiver extends Receiver<String> {
+  private static final Pattern SPACE = Pattern.compile(" ");
+
+  public static void main(String[] args) {
+    if (args.length < 3) {
+      System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" +
+          "In local mode, <master> should be 'local[n]' with n > 1");
+      System.exit(1);
+    }
+
+    StreamingExamples.setStreamingLogLevels();
+
+    // Create the context with a 1 second batch size
+    JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount",
+            new Duration(1000), System.getenv("SPARK_HOME"),
+            JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
+
+    // Create a input stream with the custom receiver on target ip:port and count the
+    // words in input stream of \n delimited text (eg. generated by 'nc')
+    JavaReceiverInputDStream<String> lines = ssc.receiverStream(
+      new JavaCustomReceiver(args[1], Integer.parseInt(args[2])));
+    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+      @Override
+      public Iterable<String> call(String x) {
+        return Lists.newArrayList(SPACE.split(x));
+      }
+    });
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+      new PairFunction<String, String, Integer>() {
+        @Override public Tuple2<String, Integer> call(String s) {
+          return new Tuple2<String, Integer>(s, 1);
+        }
+      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+        @Override
+        public Integer call(Integer i1, Integer i2) {
+          return i1 + i2;
+        }
+      });
+
+    wordCounts.print();
+    ssc.start();
+    ssc.awaitTermination();
+  }
+
+  // ============= Receiver code that receives data over a socket ==============
+
+  String host = null;
+  int port = -1;
+
+  public JavaCustomReceiver(String host_ , int port_) {
+    super(StorageLevel.MEMORY_AND_DISK_2());
+    host = host_;
+    port = port_;
+  }
+
+  public void onStart() {
+    // Start the thread that receives data over a connection
+    new Thread()  {
+      @Override public void run() {
+        receive();
+      }
+    }.start();
+  }
+
+  public void onStop() {
+    // There is nothing much to do as the thread calling receive()
+    // is designed to stop by itself isStopped() returns false
+  }
+
+  /** Create a socket connection and receive data until receiver is stopped */
+  private void receive() {
+    Socket socket = null;
+    String userInput = null;
+
+    try {
+      // connect to the server
+      socket = new Socket(host, port);
+
+      BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+
+      // Until stopped or connection broken continue reading
+      while (!isStopped() && (userInput = reader.readLine()) != null) {
+        System.out.println("Received data '" + userInput + "'");
+        store(userInput);
+      }
+      reader.close();
+      socket.close();
+
+      // Restart in an attempt to connect again when server is active again
+      restart("Trying to connect again");
+    } catch(ConnectException ce) {
+      // restart if could not connect to server
+      restart("Could not connect", ce);
+    } catch(Throwable t) {
+      restart("Error receiving data", t);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fdae095d/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
deleted file mode 100644
index e36c780..0000000
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaCustomReceiver.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.examples;
-
-import com.google.common.collect.Lists;
-
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.receiver.Receiver;
-import scala.Tuple2;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.ConnectException;
-import java.net.Socket;
-import java.util.regex.Pattern;
-
-/**
- * Custom Receiver that receives data over a socket. Received bytes is interpreted as
- * text and \n delimited lines are considered as records. They are then counted and printed.
- *
- * Usage: JavaCustomReceiver <master> <hostname> <port>
- *   <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
- *   <hostname> and <port> of the TCP server that Spark Streaming would connect to receive data.
- *
- * To run this on your local machine, you need to first run a Netcat server
- *    `$ nc -lk 9999`
- * and then run the example
- *    `$ ./run org.apache.spark.streaming.examples.JavaCustomReceiver local[2] localhost 9999`
- */
-
-public class JavaCustomReceiver extends Receiver<String> {
-  private static final Pattern SPACE = Pattern.compile(" ");
-
-  public static void main(String[] args) {
-    if (args.length < 3) {
-      System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" +
-          "In local mode, <master> should be 'local[n]' with n > 1");
-      System.exit(1);
-    }
-
-    StreamingExamples.setStreamingLogLevels();
-
-    // Create the context with a 1 second batch size
-    JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount",
-            new Duration(1000), System.getenv("SPARK_HOME"),
-            JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
-
-    // Create a input stream with the custom receiver on target ip:port and count the
-    // words in input stream of \n delimited text (eg. generated by 'nc')
-    JavaReceiverInputDStream<String> lines = ssc.receiverStream(
-      new JavaCustomReceiver(args[1], Integer.parseInt(args[2])));
-    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
-      @Override
-      public Iterable<String> call(String x) {
-        return Lists.newArrayList(SPACE.split(x));
-      }
-    });
-    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
-      new PairFunction<String, String, Integer>() {
-        @Override public Tuple2<String, Integer> call(String s) {
-          return new Tuple2<String, Integer>(s, 1);
-        }
-      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
-        @Override
-        public Integer call(Integer i1, Integer i2) {
-          return i1 + i2;
-        }
-      });
-
-    wordCounts.print();
-    ssc.start();
-    ssc.awaitTermination();
-  }
-
-  // ============= Receiver code that receives data over a socket ==============
-
-  String host = null;
-  int port = -1;
-
-  public JavaCustomReceiver(String host_ , int port_) {
-    super(StorageLevel.MEMORY_AND_DISK_2());
-    host = host_;
-    port = port_;
-  }
-
-  public void onStart() {
-    // Start the thread that receives data over a connection
-    new Thread()  {
-      @Override public void run() {
-        receive();
-      }
-    }.start();
-  }
-
-  public void onStop() {
-    // There is nothing much to do as the thread calling receive()
-    // is designed to stop by itself isStopped() returns false
-  }
-
-  /** Create a socket connection and receive data until receiver is stopped */
-  private void receive() {
-    Socket socket = null;
-    String userInput = null;
-
-    try {
-      // connect to the server
-      socket = new Socket(host, port);
-
-      BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-
-      // Until stopped or connection broken continue reading
-      while (!isStopped() && (userInput = reader.readLine()) != null) {
-        System.out.println("Received data '" + userInput + "'");
-        store(userInput);
-      }
-      reader.close();
-      socket.close();
-
-      // Restart in an attempt to connect again when server is active again
-      restart("Trying to connect again");
-    } catch(ConnectException ce) {
-      // restart if could not connect to server
-      restart("Could not connect", ce);
-    } catch(Throwable t) {
-      restart("Error receiving data", t);
-    }
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/spark/blob/fdae095d/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
new file mode 100644
index 0000000..e317e2d
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming
+
+import java.io.{InputStreamReader, BufferedReader, InputStream}
+import java.net.Socket
+
+import org.apache.spark.Logging
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.receiver.Receiver
+
+/**
+ * Custom Receiver that receives data over a socket. Received bytes is interpreted as
+ * text and \n delimited lines are considered as records. They are then counted and printed.
+ *
+ * Usage: CustomReceiver <master> <hostname> <port>
+ *   <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ *   <hostname> and <port> of the TCP server that Spark Streaming would connect to receive data.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *    `$ nc -lk 9999`
+ * and then run the example
+ *    `$ ./run org.apache.spark.examples.streaming.CustomReceiver local[2] localhost 9999`
+ */
+object CustomReceiver {
+  def main(args: Array[String]) {
+    if (args.length < 3) {
+      System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
+        "In local mode, <master> should be 'local[n]' with n > 1")
+      System.exit(1)
+    }
+
+    StreamingExamples.setStreamingLogLevels()
+
+    // Create the context with a 1 second batch size
+    val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
+      System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
+
+    // Create a input stream with the custom receiver on target ip:port and count the
+    // words in input stream of \n delimited text (eg. generated by 'nc')
+    val lines = ssc.receiverStream(new CustomReceiver(args(1), args(2).toInt))
+    val words = lines.flatMap(_.split(" "))
+    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+    wordCounts.print()
+    ssc.start()
+    ssc.awaitTermination()
+  }
+}
+
+
+class CustomReceiver(host: String, port: Int)
+  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
+
+  def onStart() {
+    // Start the thread that receives data over a connection
+    new Thread("Socket Receiver") {
+      override def run() { receive() }
+    }.start()
+  }
+
+  def onStop() {
+   // There is nothing much to do as the thread calling receive()
+   // is designed to stop by itself isStopped() returns false
+  }
+
+  /** Create a socket connection and receive data until receiver is stopped */
+  private def receive() {
+   var socket: Socket = null
+   var userInput: String = null
+   try {
+     logInfo("Connecting to " + host + ":" + port)
+     socket = new Socket(host, port)
+     logInfo("Connected to " + host + ":" + port)
+     val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
+     userInput = reader.readLine()
+     while(!isStopped && userInput != null) {
+       store(userInput)
+       userInput = reader.readLine()
+     }
+     reader.close()
+     socket.close()
+     logInfo("Stopped receiving")
+     restart("Trying to connect again")
+   } catch {
+     case e: java.net.ConnectException =>
+       restart("Error connecting to " + host + ":" + port, e)
+     case t: Throwable =>
+       restart("Error receiving data", t)
+   }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fdae095d/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala
deleted file mode 100644
index eebffd8..0000000
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/CustomReceiver.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.examples
-
-import java.io.{InputStreamReader, BufferedReader, InputStream}
-import java.net.Socket
-
-import org.apache.spark.Logging
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.receiver.Receiver
-
-/**
- * Custom Receiver that receives data over a socket. Received bytes is interpreted as
- * text and \n delimited lines are considered as records. They are then counted and printed.
- *
- * Usage: CustomReceiver <master> <hostname> <port>
- *   <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
- *   <hostname> and <port> of the TCP server that Spark Streaming would connect to receive data.
- *
- * To run this on your local machine, you need to first run a Netcat server
- *    `$ nc -lk 9999`
- * and then run the example
- *    `$ ./run org.apache.spark.streaming.examples.CustomReceiver local[2] localhost 9999`
- */
-object CustomReceiver {
-  def main(args: Array[String]) {
-    if (args.length < 3) {
-      System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
-        "In local mode, <master> should be 'local[n]' with n > 1")
-      System.exit(1)
-    }
-
-    StreamingExamples.setStreamingLogLevels()
-
-    // Create the context with a 1 second batch size
-    val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
-      System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass).toSeq)
-
-    // Create a input stream with the custom receiver on target ip:port and count the
-    // words in input stream of \n delimited text (eg. generated by 'nc')
-    val lines = ssc.receiverStream(new CustomReceiver(args(1), args(2).toInt))
-    val words = lines.flatMap(_.split(" "))
-    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
-    wordCounts.print()
-    ssc.start()
-    ssc.awaitTermination()
-  }
-}
-
-
-class CustomReceiver(host: String, port: Int)
-  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
-
-  def onStart() {
-    // Start the thread that receives data over a connection
-    new Thread("Socket Receiver") {
-      override def run() { receive() }
-    }.start()
-  }
-
-  def onStop() {
-   // There is nothing much to do as the thread calling receive()
-   // is designed to stop by itself isStopped() returns false
-  }
-
-  /** Create a socket connection and receive data until receiver is stopped */
-  private def receive() {
-   var socket: Socket = null
-   var userInput: String = null
-   try {
-     logInfo("Connecting to " + host + ":" + port)
-     socket = new Socket(host, port)
-     logInfo("Connected to " + host + ":" + port)
-     val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
-     userInput = reader.readLine()
-     while(!isStopped && userInput != null) {
-       store(userInput)
-       userInput = reader.readLine()
-     }
-     reader.close()
-     socket.close()
-     logInfo("Stopped receiving")
-     restart("Trying to connect again")
-   } catch {
-     case e: java.net.ConnectException =>
-       restart("Error connecting to " + host + ":" + port, e)
-     case t: Throwable =>
-       restart("Error receiving data", t)
-   }
-  }
-}