You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/08/11 10:31:55 UTC
spark git commit: [SPARK-16886][EXAMPLES][DOC] Fix some examples to
be consistent and indentation in documentation
Repository: spark
Updated Branches:
refs/heads/master a45fefd17 -> 7186e8c31
[SPARK-16886][EXAMPLES][DOC] Fix some examples to be consistent and indentation in documentation
## What changes were proposed in this pull request?
Originally this PR was based on #14491 but I realised that fixing examples are more sensible rather than comments.
This PR fixes three things below:
- Fix two wrong examples in `structured-streaming-programming-guide.md`. Loading via `read.load(..)` without `as` will be `Dataset<Row>` not `Dataset<String>` in Java.
- Fix indentation across `structured-streaming-programming-guide.md`. Python has 4 spaces and Scala and Java have double spaces. These are inconsistent across the examples.
- Fix `StructuredNetworkWordCountWindowed` and `StructuredNetworkWordCount` in Java and Scala to initially load `DataFrame` and `Dataset<Row>` to be consistent with the comments and some examples in `structured-streaming-programming-guide.md` and to match Scala and Java to Python one (Python one loads it as `DataFrame` initially).
## How was this patch tested?
N/A
Closes https://github.com/apache/spark/pull/14491
Author: hyukjinkwon <gu...@gmail.com>
Author: Ganesh Chand <ga...@Ganeshs-MacBook-Pro-2.local>
Closes #14564 from HyukjinKwon/SPARK-16886.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7186e8c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7186e8c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7186e8c3
Branch: refs/heads/master
Commit: 7186e8c3180b7f38250cf2f2de791472bf5325a5
Parents: a45fefd
Author: hyukjinkwon <gu...@gmail.com>
Authored: Thu Aug 11 11:31:52 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Aug 11 11:31:52 2016 +0100
----------------------------------------------------------------------
docs/structured-streaming-programming-guide.md | 202 +++++++++----------
.../JavaStructuredNetworkWordCount.java | 6 +-
.../JavaStructuredNetworkWordCountWindowed.java | 30 +--
.../streaming/StructuredNetworkWordCount.scala | 4 +-
.../StructuredNetworkWordCountWindowed.scala | 4 +-
5 files changed, 124 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7186e8c3/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 8c14c3d..99d50e5 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -46,9 +46,9 @@ import java.util.Arrays;
import java.util.Iterator;
SparkSession spark = SparkSession
- .builder()
- .appName("JavaStructuredNetworkWordCount")
- .getOrCreate();
+ .builder()
+ .appName("JavaStructuredNetworkWordCount")
+ .getOrCreate();
{% endhighlight %}
</div>
@@ -95,7 +95,7 @@ This `lines` DataFrame represents an unbounded table containing the streaming te
{% highlight java %}
// Create DataFrame representing the stream of input lines from connection to localhost:9999
-Dataset<String> lines = spark
+Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", "localhost")
@@ -104,14 +104,14 @@ Dataset<String> lines = spark
// Split the lines into words
Dataset<String> words = lines
- .as(Encoders.STRING())
- .flatMap(
- new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(x.split(" ")).iterator();
- }
- }, Encoders.STRING());
+ .as(Encoders.STRING())
+ .flatMap(
+ new FlatMapFunction<String, String>() {
+ @Override
+ public Iterator<String> call(String x) {
+ return Arrays.asList(x.split(" ")).iterator();
+ }
+ }, Encoders.STRING());
// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();
@@ -125,11 +125,11 @@ This `lines` DataFrame represents an unbounded table containing the streaming te
{% highlight python %}
# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark\
- .readStream\
- .format('socket')\
- .option('host', 'localhost')\
- .option('port', 9999)\
- .load()
+ .readStream\
+ .format('socket')\
+ .option('host', 'localhost')\
+ .option('port', 9999)\
+ .load()
# Split the lines into words
words = lines.select(
@@ -434,11 +434,11 @@ val spark: SparkSession = ...
// Read text from socket
val socketDF = spark
- .readStream
- .format("socket")
- .option("host", "localhost")
- .option("port", 9999)
- .load()
+ .readStream
+ .format("socket")
+ .option("host", "localhost")
+ .option("port", 9999)
+ .load()
socketDF.isStreaming // Returns True for DataFrames that have streaming sources
@@ -447,10 +447,10 @@ socketDF.printSchema
// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
- .readStream
- .option("sep", ";")
- .schema(userSchema) // Specify schema of the csv files
- .csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory")
+ .readStream
+ .option("sep", ";")
+ .schema(userSchema) // Specify schema of the csv files
+ .csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory")
{% endhighlight %}
</div>
@@ -461,11 +461,11 @@ SparkSession spark = ...
// Read text from socket
Dataset[Row] socketDF = spark
- .readStream()
- .format("socket")
- .option("host", "localhost")
- .option("port", 9999)
- .load();
+ .readStream()
+ .format("socket")
+ .option("host", "localhost")
+ .option("port", 9999)
+ .load();
socketDF.isStreaming(); // Returns True for DataFrames that have streaming sources
@@ -474,10 +474,10 @@ socketDF.printSchema();
// Read all the csv files written atomically in a directory
StructType userSchema = new StructType().add("name", "string").add("age", "integer");
Dataset[Row] csvDF = spark
- .readStream()
- .option("sep", ";")
- .schema(userSchema) // Specify schema of the csv files
- .csv("/path/to/directory"); // Equivalent to format("csv").load("/path/to/directory")
+ .readStream()
+ .option("sep", ";")
+ .schema(userSchema) // Specify schema of the csv files
+ .csv("/path/to/directory"); // Equivalent to format("csv").load("/path/to/directory")
{% endhighlight %}
</div>
@@ -549,12 +549,12 @@ import org.apache.spark.sql.expressions.javalang.typed;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
public class DeviceData {
- private String device;
- private String type;
- private Double signal;
- private java.sql.Date time;
- ...
- // Getter and setter methods for each field
+ private String device;
+ private String type;
+ private Double signal;
+ private java.sql.Date time;
+ ...
+ // Getter and setter methods for each field
}
Dataset<Row> df = ...; // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
@@ -828,33 +828,33 @@ val noAggDF = deviceDataDf.select("device").where("signal > 10")
// Print new data to console
noAggDF
- .writeStream
- .format("console")
- .start()
+ .writeStream
+ .format("console")
+ .start()
// Write new data to Parquet files
noAggDF
- .writeStream
- .parquet("path/to/destination/directory")
- .start()
+ .writeStream
+ .parquet("path/to/destination/directory")
+ .start()
// ========== DF with aggregation ==========
val aggDF = df.groupBy(\u201cdevice\u201d).count()
// Print updated aggregations to console
aggDF
- .writeStream
- .outputMode("complete")
- .format("console")
- .start()
+ .writeStream
+ .outputMode("complete")
+ .format("console")
+ .start()
// Have all the aggregates in an in-memory table
aggDF
- .writeStream
- .queryName("aggregates") // this query name will be the table name
- .outputMode("complete")
- .format("memory")
- .start()
+ .writeStream
+ .queryName("aggregates") // this query name will be the table name
+ .outputMode("complete")
+ .format("memory")
+ .start()
spark.sql("select * from aggregates").show() // interactively query in-memory table
{% endhighlight %}
@@ -868,33 +868,33 @@ Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10");
// Print new data to console
noAggDF
- .writeStream()
- .format("console")
- .start();
+ .writeStream()
+ .format("console")
+ .start();
// Write new data to Parquet files
noAggDF
- .writeStream()
- .parquet("path/to/destination/directory")
- .start();
+ .writeStream()
+ .parquet("path/to/destination/directory")
+ .start();
// ========== DF with aggregation ==========
Dataset<Row> aggDF = df.groupBy(\u201cdevice\u201d).count();
// Print updated aggregations to console
aggDF
- .writeStream()
- .outputMode("complete")
- .format("console")
- .start();
+ .writeStream()
+ .outputMode("complete")
+ .format("console")
+ .start();
// Have all the aggregates in an in-memory table
aggDF
- .writeStream()
- .queryName("aggregates") // this query name will be the table name
- .outputMode("complete")
- .format("memory")
- .start();
+ .writeStream()
+ .queryName("aggregates") // this query name will be the table name
+ .outputMode("complete")
+ .format("memory")
+ .start();
spark.sql("select * from aggregates").show(); // interactively query in-memory table
{% endhighlight %}
@@ -908,33 +908,33 @@ noAggDF = deviceDataDf.select("device").where("signal > 10")
# Print new data to console
noAggDF\
- .writeStream()\
- .format("console")\
- .start()
+ .writeStream()\
+ .format("console")\
+ .start()
# Write new data to Parquet files
noAggDF\
- .writeStream()\
- .parquet("path/to/destination/directory")\
- .start()
+ .writeStream()\
+ .parquet("path/to/destination/directory")\
+ .start()
# ========== DF with aggregation ==========
aggDF = df.groupBy(\u201cdevice\u201d).count()
# Print updated aggregations to console
aggDF\
- .writeStream()\
- .outputMode("complete")\
- .format("console")\
- .start()
+ .writeStream()\
+ .outputMode("complete")\
+ .format("console")\
+ .start()
# Have all the aggregates in an in memory table. The query name will be the table name
aggDF\
- .writeStream()\
- .queryName("aggregates")\
- .outputMode("complete")\
- .format("memory")\
- .start()
+ .writeStream()\
+ .queryName("aggregates")\
+ .outputMode("complete")\
+ .format("memory")\
+ .start()
spark.sql("select * from aggregates").show() # interactively query in-memory table
{% endhighlight %}
@@ -1093,11 +1093,11 @@ In case of a failure or intentional shutdown, you can recover the previous progr
{% highlight scala %}
aggDF
- .writeStream
- .outputMode("complete")
- .option(\u201ccheckpointLocation\u201d, \u201cpath/to/HDFS/dir\u201d)
- .format("memory")
- .start()
+ .writeStream
+ .outputMode("complete")
+ .option(\u201ccheckpointLocation\u201d, \u201cpath/to/HDFS/dir\u201d)
+ .format("memory")
+ .start()
{% endhighlight %}
</div>
@@ -1105,11 +1105,11 @@ aggDF
{% highlight java %}
aggDF
- .writeStream()
- .outputMode("complete")
- .option(\u201ccheckpointLocation\u201d, \u201cpath/to/HDFS/dir\u201d)
- .format("memory")
- .start();
+ .writeStream()
+ .outputMode("complete")
+ .option(\u201ccheckpointLocation\u201d, \u201cpath/to/HDFS/dir\u201d)
+ .format("memory")
+ .start();
{% endhighlight %}
</div>
@@ -1117,11 +1117,11 @@ aggDF
{% highlight python %}
aggDF\
- .writeStream()\
- .outputMode("complete")\
- .option(\u201ccheckpointLocation\u201d, \u201cpath/to/HDFS/dir\u201d)\
- .format("memory")\
- .start()
+ .writeStream()\
+ .outputMode("complete")\
+ .option(\u201ccheckpointLocation\u201d, \u201cpath/to/HDFS/dir\u201d)\
+ .format("memory")\
+ .start()
{% endhighlight %}
</div>
http://git-wip-us.apache.org/repos/asf/spark/blob/7186e8c3/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
index 346d218..c913ee0 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java
@@ -53,15 +53,15 @@ public final class JavaStructuredNetworkWordCount {
.getOrCreate();
// Create DataFrame representing the stream of input lines from connection to host:port
- Dataset<String> lines = spark
+ Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", host)
.option("port", port)
- .load().as(Encoders.STRING());
+ .load();
// Split the lines into words
- Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ Dataset<String> words = lines.as(Encoders.STRING()).flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
http://git-wip-us.apache.org/repos/asf/spark/blob/7186e8c3/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
index 557d36c..172d053 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
@@ -75,28 +75,30 @@ public final class JavaStructuredNetworkWordCountWindowed {
.getOrCreate();
// Create DataFrame representing the stream of input lines from connection to host:port
- Dataset<Tuple2<String, Timestamp>> lines = spark
+ Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
- .load().as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()));
+ .load();
// Split the lines into words, retaining timestamps
- Dataset<Row> words = lines.flatMap(
- new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() {
- @Override
- public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) {
- List<Tuple2<String, Timestamp>> result = new ArrayList<>();
- for (String word : t._1.split(" ")) {
- result.add(new Tuple2<>(word, t._2));
+ Dataset<Row> words = lines
+ .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()))
+ .flatMap(
+ new FlatMapFunction<Tuple2<String, Timestamp>, Tuple2<String, Timestamp>>() {
+ @Override
+ public Iterator<Tuple2<String, Timestamp>> call(Tuple2<String, Timestamp> t) {
+ List<Tuple2<String, Timestamp>> result = new ArrayList<>();
+ for (String word : t._1.split(" ")) {
+ result.add(new Tuple2<>(word, t._2));
+ }
+ return result.iterator();
}
- return result.iterator();
- }
- },
- Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
- ).toDF("word", "timestamp");
+ },
+ Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
+ ).toDF("word", "timestamp");
// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
http://git-wip-us.apache.org/repos/asf/spark/blob/7186e8c3/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
index 364bff2..f0756c4 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala
@@ -56,10 +56,10 @@ object StructuredNetworkWordCount {
.format("socket")
.option("host", host)
.option("port", port)
- .load().as[String]
+ .load()
// Split the lines into words
- val words = lines.flatMap(_.split(" "))
+ val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
http://git-wip-us.apache.org/repos/asf/spark/blob/7186e8c3/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
index 333b0a9..b4dad21 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
@@ -78,10 +78,10 @@ object StructuredNetworkWordCountWindowed {
.option("host", host)
.option("port", port)
.option("includeTimestamp", true)
- .load().as[(String, Timestamp)]
+ .load()
// Split the lines into words, retaining timestamps
- val words = lines.flatMap(line =>
+ val words = lines.as[(String, Timestamp)].flatMap(line =>
line._1.split(" ").map(word => (word, line._2))
).toDF("word", "timestamp")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org