You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/08/11 10:31:55 UTC

spark git commit: [SPARK-16886][EXAMPLES][DOC] Fix some examples to be consistent and indentation in documentation

Repository: spark
Updated Branches:
  refs/heads/master a45fefd17 -> 7186e8c31


[SPARK-16886][EXAMPLES][DOC] Fix some examples to be consistent and indentation in documentation

## What changes were proposed in this pull request?

Originally this PR was based on #14491 but I realised that fixing examples are more sensible rather than comments.

This PR fixes three things below:

 - Fix two wrong examples in `structured-streaming-programming-guide.md`. Loading via `read.load(..)` without `as` will be `Dataset<Row>` not `Dataset<String>` in Java.

- Fix indentation across `structured-streaming-programming-guide.md`. Python has 4 spaces and Scala and Java have double spaces. These are inconsistent across the examples.

- Fix `StructuredNetworkWordCountWindowed` and  `StructuredNetworkWordCount` in Java and Scala to initially load `DataFrame` and `Dataset<Row>` to be consistent with the comments and some examples in `structured-streaming-programming-guide.md` and to match Scala and Java to Python one (Python one loads it as `DataFrame` initially).

## How was this patch tested?

N/A

Closes https://github.com/apache/spark/pull/14491

Author: hyukjinkwon <gu...@gmail.com>
Author: Ganesh Chand <ga...@Ganeshs-MacBook-Pro-2.local>

Closes #14564 from HyukjinKwon/SPARK-16886.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7186e8c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7186e8c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7186e8c3

Branch: refs/heads/master
Commit: 7186e8c3180b7f38250cf2f2de791472bf5325a5
Parents: a45fefd
Author: hyukjinkwon <gu...@gmail.com>
Authored: Thu Aug 11 11:31:52 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Aug 11 11:31:52 2016 +0100

----------------------------------------------------------------------
 docs/structured-streaming-programming-guide.md  | 202 +++++++++----------
 .../JavaStructuredNetworkWordCount.java         |   6 +-
 .../JavaStructuredNetworkWordCountWindowed.java |  30 +--
 .../streaming/StructuredNetworkWordCount.scala  |   4 +-
 .../StructuredNetworkWordCountWindowed.scala    |   4 +-
 5 files changed, 124 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7186e8c3/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 8c14c3d..99d50e5 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -46,9 +46,9 @@ import java.util.Arrays;
 import java.util.Iterator;
 
 SparkSession spark = SparkSession
-    .builder()
-    .appName("JavaStructuredNetworkWordCount")
-    .getOrCreate();
+  .builder()
+  .appName("JavaStructuredNetworkWordCount")
+  .getOrCreate();
 {% endhighlight %}
 
 </div>
@@ -95,7 +95,7 @@ This `lines` DataFrame represents an unbounded table containing the streaming te
 
 {% highlight java %}
 // Create DataFrame representing the stream of input lines from connection to localhost:9999
-Dataset<String> lines = spark
+Dataset<Row> lines = spark
   .readStream()
   .format("socket")
   .option("host", "localhost")
@@ -104,14 +104,14 @@ Dataset<String> lines = spark
 
 // Split the lines into words
 Dataset<String> words = lines
-    .as(Encoders.STRING())
-    .flatMap(
-        new FlatMapFunction<String, String>() {
-          @Override
-          public Iterator<String> call(String x) {
-            return Arrays.asList(x.split(" ")).iterator();
-          }
-        }, Encoders.STRING());
+  .as(Encoders.STRING())
+  .flatMap(
+    new FlatMapFunction<String, String>() {
+      @Override
+      public Iterator<String> call(String x) {
+        return Arrays.asList(x.split(" ")).iterator();
+      }
+    }, Encoders.STRING());
 
 // Generate running word count
 Dataset<Row> wordCounts = words.groupBy("value").count();
@@ -125,11 +125,11 @@ This `lines` DataFrame represents an unbounded table containing the streaming te
 {% highlight python %}
 # Create DataFrame representing the stream of input lines from connection to localhost:9999
 lines = spark\
-   .readStream\
-   .format('socket')\
-   .option('host', 'localhost')\
-   .option('port', 9999)\
-   .load()
+    .readStream\
+    .format('socket')\
+    .option('host', 'localhost')\
+    .option('port', 9999)\
+    .load()
 
 # Split the lines into words
 words = lines.select(
@@ -434,11 +434,11 @@ val spark: SparkSession = ...
 
 // Read text from socket 
 val socketDF = spark
-    .readStream
-    .format("socket")
-    .option("host", "localhost")
-    .option("port", 9999)
-    .load()
+  .readStream
+  .format("socket")
+  .option("host", "localhost")
+  .option("port", 9999)
+  .load()
 
 socketDF.isStreaming    // Returns True for DataFrames that have streaming sources
 
@@ -447,10 +447,10 @@ socketDF.printSchema
 // Read all the csv files written atomically in a directory
 val userSchema = new StructType().add("name", "string").add("age", "integer")
 val csvDF = spark
-    .readStream
-    .option("sep", ";")
-    .schema(userSchema)      // Specify schema of the csv files
-    .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")
+  .readStream
+  .option("sep", ";")
+  .schema(userSchema)      // Specify schema of the csv files
+  .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")
 {% endhighlight %}
 
 </div>
@@ -461,11 +461,11 @@ SparkSession spark = ...
 
 // Read text from socket 
 Dataset[Row] socketDF = spark
-    .readStream()
-    .format("socket")
-    .option("host", "localhost")
-    .option("port", 9999)
-    .load();
+  .readStream()
+  .format("socket")
+  .option("host", "localhost")
+  .option("port", 9999)
+  .load();
 
 socketDF.isStreaming();    // Returns True for DataFrames that have streaming sources
 
@@ -474,10 +474,10 @@ socketDF.printSchema();
 // Read all the csv files written atomically in a directory
 StructType userSchema = new StructType().add("name", "string").add("age", "integer");
 Dataset[Row] csvDF = spark
-    .readStream()
-    .option("sep", ";")
-    .schema(userSchema)      // Specify schema of the csv files
-    .csv("/path/to/directory");    // Equivalent to format("csv").load("/path/to/directory")
+  .readStream()
+  .option("sep", ";")
+  .schema(userSchema)      // Specify schema of the csv files
+  .csv("/path/to/directory");    // Equivalent to format("csv").load("/path/to/directory")
 {% endhighlight %}
 
 </div>
@@ -549,12 +549,12 @@ import org.apache.spark.sql.expressions.javalang.typed;
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
 
 public class DeviceData {
-    private String device;
-    private String type;
-    private Double signal;
-    private java.sql.Date time;
-    ...
-    // Getter and setter methods for each field
+  private String device;
+  private String type;
+  private Double signal;
+  private java.sql.Date time;
+  ...
+  // Getter and setter methods for each field
 }
 
 Dataset<Row> df = ...;    // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
@@ -828,33 +828,33 @@ val noAggDF = deviceDataDf.select("device").where("signal > 10")
 
 // Print new data to console
 noAggDF
-   .writeStream
-   .format("console")
-   .start()
+  .writeStream
+  .format("console")
+  .start()
 
 // Write new data to Parquet files
 noAggDF
-   .writeStream
-   .parquet("path/to/destination/directory")
-   .start()
+  .writeStream
+  .parquet("path/to/destination/directory")
+  .start()
    
 // ========== DF with aggregation ==========
 val aggDF = df.groupBy(\u201cdevice\u201d).count()
 
 // Print updated aggregations to console
 aggDF
-   .writeStream
-   .outputMode("complete")
-   .format("console")
-   .start()
+  .writeStream
+  .outputMode("complete")
+  .format("console")
+  .start()
 
 // Have all the aggregates in an in-memory table 
 aggDF
-   .writeStream
-   .queryName("aggregates")    // this query name will be the table name
-   .outputMode("complete")
-   .format("memory")
-   .start()
+  .writeStream
+  .queryName("aggregates")    // this query name will be the table name
+  .outputMode("complete")
+  .format("memory")
+  .start()
 
 spark.sql("select * from aggregates").show()   // interactively query in-memory table
 {% endhighlight %}
@@ -868,33 +868,33 @@ Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10");
 
 // Print new data to console
 noAggDF
-   .writeStream()
-   .format("console")
-   .start();
+  .writeStream()
+  .format("console")
+  .start();
 
 // Write new data to Parquet files
 noAggDF
-   .writeStream()
-   .parquet("path/to/destination/directory")
-   .start();
+  .writeStream()
+  .parquet("path/to/destination/directory")
+  .start();
    
 // ========== DF with aggregation ==========
 Dataset<Row> aggDF = df.groupBy(\u201cdevice\u201d).count();
 
 // Print updated aggregations to console
 aggDF
-   .writeStream()
-   .outputMode("complete")
-   .format("console")
-   .start();
+  .writeStream()
+  .outputMode("complete")
+  .format("console")
+  .start();
 
 // Have all the aggregates in an in-memory table 
 aggDF
-   .writeStream()
-   .queryName("aggregates")    // this query name will be the table name
-   .outputMode("complete")
-   .format("memory")
-   .start();
+  .writeStream()
+  .queryName("aggregates")    // this query name will be the table name
+  .outputMode("complete")
+  .format("memory")
+  .start();
 
 spark.sql("select * from aggregates").show();   // interactively query in-memory table
 {% endhighlight %}
@@ -908,33 +908,33 @@ noAggDF = deviceDataDf.select("device").where("signal > 10")
 
 # Print new data to console
 noAggDF\
-   .writeStream()\
-   .format("console")\
-   .start()
+    .writeStream()\
+    .format("console")\
+    .start()
 
 # Write new data to Parquet files
 noAggDF\
-   .writeStream()\
-   .parquet("path/to/destination/directory")\
-   .start()
+    .writeStream()\
+    .parquet("path/to/destination/directory")\
+    .start()
    
 # ========== DF with aggregation ==========
 aggDF = df.groupBy(\u201cdevice\u201d).count()
 
 # Print updated aggregations to console
 aggDF\
-   .writeStream()\
-   .outputMode("complete")\
-   .format("console")\
-   .start()
+    .writeStream()\
+    .outputMode("complete")\
+    .format("console")\
+    .start()
 
 # Have all the aggregates in an in memory table. The query name will be the table name
 aggDF\
-   .writeStream()\
-   .queryName("aggregates")\
-   .outputMode("complete")\
-   .format("memory")\
-   .start()
+    .writeStream()\
+    .queryName("aggregates")\
+    .outputMode("complete")\
+    .format("memory")\
+    .start()
 
 spark.sql("select * from aggregates").show()   # interactively query in-memory table
 {% endhighlight %}
@@ -1093,11 +1093,11 @@ In case of a failure or intentional shutdown, you can recover the previous progr
 
 {% highlight scala %}
 aggDF
-   .writeStream
-   .outputMode("complete")
-   .option(\u201ccheckpointLocation\u201d, \u201cpath/to/HDFS/dir\u201d)
-   .format("memory")
-   .start()
+  .writeStream
+  .outputMode("complete")
+  .option(\u201ccheckpointLocation\u201d, \u201cpath/to/HDFS/dir\u201d)
+  .format("memory")
+  .start()
 {% endhighlight %}
 
 </div>
@@ -1105,11 +1105,11 @@ aggDF
 
 {% highlight java %}
 aggDF
-   .writeStream()
-   .outputMode("complete")
-   .option(\u201ccheckpointLocation\u201d, \u201cpath/to/HDFS/dir\u201d)
-   .format("memory")
-   .start();
+  .writeStream()
+  .outputMode("complete")
+  .option(\u201ccheckpointLocation\u201d, \u201cpath/to/HDFS/dir\u201d)
+  .format("memory")
+  .start();
 {% endhighlight %}
 
 </div>
@@ -1117,11 +1117,11 @@ aggDF
 
 {% highlight python %}
 aggDF\
-   .writeStream()\
-   .outputMode("complete")\
-   .option(\u201ccheckpointLocation\u201d, \u201cpath/to/HDFS/dir\u201d)\
-   .format("memory")\
-   .start()
+    .writeStream()\
+    .outputMode("complete")\
+    .option(\u201ccheckpointLocation\u201d, \u201cpath/to/HDFS/dir\u201d)\
+    .format("memory")\
+    .start()
 {% endhighlight %}
 
 </div>

http://git-wip-us.apache.org/repos/asf/spark/blob/7186e8c3/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
index 346d218..c913ee0 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
@@ -53,15 +53,15 @@ public final class JavaStructuredNetworkWordCount {
       .getOrCreate();
 
     // Create DataFrame representing the stream of input lines from connection to host:port
-    Dataset<String> lines = spark
+    Dataset<Row> lines = spark
       .readStream()
       .format("socket")
       .option("host", host)
       .option("port", port)
-      .load().as(Encoders.STRING());
+      .load();
 
     // Split the lines into words
-    Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+    Dataset<String> words = lines.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() {
       @Override
       public Iterator<String> call(String x) {
         return Arrays.asList(x.split(" ")).iterator();

http://git-wip-us.apache.org/repos/asf/spark/blob/7186e8c3/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
index 557d36c..172d053 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
@@ -75,28 +75,30 @@ public final class JavaStructuredNetworkWordCountWindowed {
       .getOrCreate();
 
     // Create DataFrame representing the stream of input lines from connection to host:port
-    Dataset<Tuple2<String, Timestamp>> lines = spark
+    Dataset<Row> lines = spark
       .readStream()
       .format("socket")
       .option("host", host)
       .option("port", port)
       .option("includeTimestamp", true)
-      .load().as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()));
+      .load();
 
     // Split the lines into words, retaining timestamps
-    Dataset<Row> words = lines.flatMap(
-      new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() {
-        @Override
-        public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) {
-          List<Tuple2<String, Timestamp>> result = new ArrayList<>();
-          for (String word : t._1.split(" ")) {
-            result.add(new Tuple2<>(word, t._2));
+    Dataset<Row> words = lines
+      .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
+      .flatMap(
+        new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() {
+          @Override
+          public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) {
+            List<Tuple2<String, Timestamp>> result = new ArrayList<>();
+            for (String word : t._1.split(" ")) {
+              result.add(new Tuple2<>(word, t._2));
+            }
+            return result.iterator();
           }
-          return result.iterator();
-        }
-      },
-      Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
-    ).toDF("word", "timestamp");
+        },
+        Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
+      ).toDF("word", "timestamp");
 
     // Group the data by window and word and compute the count of each group
     Dataset<Row> windowedCounts = words.groupBy(

http://git-wip-us.apache.org/repos/asf/spark/blob/7186e8c3/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
index 364bff2..f0756c4 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
@@ -56,10 +56,10 @@ object StructuredNetworkWordCount {
       .format("socket")
       .option("host", host)
       .option("port", port)
-      .load().as[String]
+      .load()
 
     // Split the lines into words
-    val words = lines.flatMap(_.split(" "))
+    val words = lines.as[String].flatMap(_.split(" "))
 
     // Generate running word count
     val wordCounts = words.groupBy("value").count()

http://git-wip-us.apache.org/repos/asf/spark/blob/7186e8c3/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
index 333b0a9..b4dad21 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
@@ -78,10 +78,10 @@ object StructuredNetworkWordCountWindowed {
       .option("host", host)
       .option("port", port)
       .option("includeTimestamp", true)
-      .load().as[(String, Timestamp)]
+      .load()
 
     // Split the lines into words, retaining timestamps
-    val words = lines.flatMap(line =>
+    val words = lines.as[(String, Timestamp)].flatMap(line =>
       line._1.split(" ").map(word => (word, line._2))
     ).toDF("word", "timestamp")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org