You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2014/11/10 20:47:44 UTC

spark git commit: SPARK-2548 [STREAMING] JavaRecoverableWordCount is missing

Repository: spark
Updated Branches:
  refs/heads/master ed8bf1eac -> 3a02d416c


SPARK-2548 [STREAMING] JavaRecoverableWordCount is missing

Here's my attempt to re-port `RecoverableNetworkWordCount` to Java, following the example of its Scala and Java siblings. I fixed a few minor doc/formatting issues along the way I believe.

Author: Sean Owen <so...@cloudera.com>

Closes #2564 from srowen/SPARK-2548 and squashes the following commits:

0d0bf29 [Sean Owen] Update checkpoint call as in https://github.com/apache/spark/pull/2735
35f23e3 [Sean Owen] Remove old comment about running in standalone mode
179b3c2 [Sean Owen] Re-port RecoverableNetworkWordCount to Java example, and touch up doc / formatting in related examples


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a02d416
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a02d416
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a02d416

Branch: refs/heads/master
Commit: 3a02d416cd82a7a942fd6ff4a0e05ff070eb218a
Parents: ed8bf1e
Author: Sean Owen <so...@cloudera.com>
Authored: Mon Nov 10 11:47:27 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Nov 10 11:47:27 2014 -0800

----------------------------------------------------------------------
 .../streaming/JavaNetworkWordCount.java         |   7 +-
 .../JavaRecoverableNetworkWordCount.java        | 154 +++++++++++++++++++
 .../streaming/RecoverableNetworkWordCount.scala |  15 +-
 3 files changed, 159 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3a02d416/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
index 45bcede..3e9f0f4 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
@@ -25,7 +25,7 @@ import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.api.java.StorageLevels;
-import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Durations;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
 import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
@@ -35,8 +35,9 @@ import java.util.regex.Pattern;
 
 /**
  * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ *
  * Usage: JavaNetworkWordCount <hostname> <port>
- *   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
  *
  * To run this on your local machine, you need to first run a Netcat server
  *    `$ nc -lk 9999`
@@ -56,7 +57,7 @@ public final class JavaNetworkWordCount {
 
     // Create the context with a 1 second batch size
     SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
-    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,  new Duration(1000));
+    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
 
     // Create a JavaReceiverInputDStream on target ip:port and count the
     // words in input stream of \n delimited text (eg. generated by 'nc')

http://git-wip-us.apache.org/repos/asf/spark/blob/3a02d416/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
new file mode 100644
index 0000000..bceda97
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+import scala.Tuple2;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.Time;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
+
+/**
+ * Counts words in text encoded with UTF8 received from the network every second.
+ *
+ * Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
+ *   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
+ *   data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
+ *   <output-file> file to which the word counts will be appended
+ *
+ * <checkpoint-directory> and <output-file> must be absolute paths
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *
+ *      `$ nc -lk 9999`
+ *
+ * and run the example as
+ *
+ *      `$ ./bin/run-example org.apache.spark.examples.streaming.JavaRecoverableNetworkWordCount \
+ *              localhost 9999 ~/checkpoint/ ~/out`
+ *
+ * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
+ * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
+ * checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
+ * the checkpoint data.
+ *
+ * Refer to the online documentation for more details.
+ */
+public final class JavaRecoverableNetworkWordCount {
+  private static final Pattern SPACE = Pattern.compile(" ");
+
+  private static JavaStreamingContext createContext(String ip,
+                                                    int port,
+                                                    String checkpointDirectory,
+                                                    String outputPath) {
+
+    // If you do not see this printed, that means the StreamingContext has been loaded
+    // from the new checkpoint
+    System.out.println("Creating new context");
+    final File outputFile = new File(outputPath);
+    if (outputFile.exists()) {
+      outputFile.delete();
+    }
+    SparkConf sparkConf = new SparkConf().setAppName("JavaRecoverableNetworkWordCount");
+    // Create the context with a 1 second batch size
+    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
+    ssc.checkpoint(checkpointDirectory);
+
+    // Create a socket stream on target ip:port and count the
+    // words in input stream of \n delimited text (eg. generated by 'nc')
+    JavaReceiverInputDStream<String> lines = ssc.socketTextStream(ip, port);
+    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+      @Override
+      public Iterable<String> call(String x) {
+        return Lists.newArrayList(SPACE.split(x));
+      }
+    });
+    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
+      new PairFunction<String, String, Integer>() {
+        @Override
+        public Tuple2<String, Integer> call(String s) {
+          return new Tuple2<String, Integer>(s, 1);
+        }
+      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+        @Override
+        public Integer call(Integer i1, Integer i2) {
+          return i1 + i2;
+        }
+      });
+
+    wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
+      @Override
+      public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
+        String counts = "Counts at time " + time + " " + rdd.collect();
+        System.out.println(counts);
+        System.out.println("Appending to " + outputFile.getAbsolutePath());
+        Files.append(counts + "\n", outputFile, Charset.defaultCharset());
+        return null;
+      }
+    });
+
+    return ssc;
+  }
+
+  public static void main(String[] args) {
+    if (args.length != 4) {
+      System.err.println("You arguments were " + Arrays.asList(args));
+      System.err.println(
+          "Usage: JavaRecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>\n" +
+          "     <output-file>. <hostname> and <port> describe the TCP server that Spark\n" +
+          "     Streaming would connect to receive data. <checkpoint-directory> directory to\n" +
+          "     HDFS-compatible file system which checkpoint data <output-file> file to which\n" +
+          "     the word counts will be appended\n" +
+          "\n" +
+          "In local mode, <master> should be 'local[n]' with n > 1\n" +
+          "Both <checkpoint-directory> and <output-file> must be absolute paths");
+      System.exit(1);
+    }
+
+    final String ip = args[0];
+    final int port = Integer.parseInt(args[1]);
+    final String checkpointDirectory = args[2];
+    final String outputPath = args[3];
+    JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
+      @Override
+      public JavaStreamingContext create() {
+        return createContext(ip, port, checkpointDirectory, outputPath);
+      }
+    };
+    JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
+    ssc.start();
+    ssc.awaitTermination();
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3a02d416/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index 6af3a0f..eb48db8 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -31,15 +31,13 @@ import org.apache.spark.util.IntParam
 /**
  * Counts words in text encoded with UTF8 received from the network every second.
  *
- * Usage: NetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
+ * Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
  *   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
  *   data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
  *   <output-file> file to which the word counts will be appended
  *
- * In local mode, <master> should be 'local[n]' with n > 1
  * <checkpoint-directory> and <output-file> must be absolute paths
  *
- *
  * To run this on your local machine, you need to first run a Netcat server
  *
  *      `$ nc -lk 9999`
@@ -54,19 +52,8 @@ import org.apache.spark.util.IntParam
  * checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
  * the checkpoint data.
  *
- * To run this example in a local standalone cluster with automatic driver recovery,
- *
- *      `$ bin/spark-class org.apache.spark.deploy.Client -s launch <cluster-url> \
- *              <path-to-examples-jar> \
- *              org.apache.spark.examples.streaming.RecoverableNetworkWordCount <cluster-url> \
- *              localhost 9999 ~/checkpoint ~/out`
- *
- * <path-to-examples-jar> would typically be
- * <spark-dir>/examples/target/scala-XX/spark-examples....jar
- *
  * Refer to the online documentation for more details.
  */
-
 object RecoverableNetworkWordCount {
 
   def createContext(ip: String, port: Int, outputPath: String) = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org