You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/01/22 05:39:30 UTC
[spark] branch branch-2.4 updated: [SPARK-30553][DOCS] fix
structured-streaming java example error
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 0300d4b [SPARK-30553][DOCS] fix structured-streaming java example error
0300d4b is described below
commit 0300d4bde7343c2e260430e4f470505e9e721ab0
Author: bettermouse <qq5375631>
AuthorDate: Tue Jan 21 21:37:21 2020 -0800
[SPARK-30553][DOCS] fix structured-streaming java example error
# What changes were proposed in this pull request?
Fix structured-streaming java example error.
```java
Dataset<Row> windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
words.col("word"))
.count();
```
It does not clean up old state.May cause OOM
> Before the fix
```scala
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter48e331f0
+- *(4) HashAggregate(keys=[window#13, word#4], functions=[count(1)], output=[window#13, word#4, count#12L])
+- StateStoreSave [window#13, word#4], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-91124080-0e20-41c0-9150-91735bdc22c0/state, runId = 5c425536-a3ae-4385-8167-5fa529e6760d, opId = 0, ver = 6, numPartitions = 1], Update, 1579530890886, 2
+- *(3) HashAggregate(keys=[window#13, word#4], functions=[merge_count(1)], output=[window#13, word#4, count#23L])
+- StateStoreRestore [window#13, word#4], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-91124080-0e20-41c0-9150-91735bdc22c0/state, runId = 5c425536-a3ae-4385-8167-5fa529e6760d, opId = 0, ver = 6, numPartitions = 1], 2
+- *(2) HashAggregate(keys=[window#13, word#4], functions=[merge_count(1)], output=[window#13, word#4, count#23L])
+- Exchange hashpartitioning(window#13, word#4, 1)
+- *(1) HashAggregate(keys=[window#13, word#4], functions=[partial_count(1)], output=[window#13, word#4, count#23L])
+- *(1) Project [window#13, word#4]
+- *(1) Filter (((isnotnull(timestamp#5) && isnotnull(window#13)) && (timestamp#5 >= window#13.start)) && (timestamp#5 < window#13.end))
+- *(1) Expand [List(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast((precisetimestampconversion(ti [...]
+- EventTimeWatermark timestamp#5: timestamp, interval 10 minutes
+- LocalTableScan <empty>, [word#4, timestamp#5]
```
> After the fix
```scala
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter1df12a96
+- *(4) HashAggregate(keys=[window#13-T600000ms, word#4], functions=[count(1)], output=[window#8-T600000ms, word#4, count#12L])
+- StateStoreSave [window#13-T600000ms, word#4], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-95ac74cc-aca6-42eb-827d-7586aa69bcd3/state, runId = 91fa311d-d47e-4726-9d0a-f21ef268d9d0, opId = 0, ver = 4, numPartitions = 1], Update, 1579529975342, 2
+- *(3) HashAggregate(keys=[window#13-T600000ms, word#4], functions=[merge_count(1)], output=[window#13-T600000ms, word#4, count#23L])
+- StateStoreRestore [window#13-T600000ms, word#4], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-95ac74cc-aca6-42eb-827d-7586aa69bcd3/state, runId = 91fa311d-d47e-4726-9d0a-f21ef268d9d0, opId = 0, ver = 4, numPartitions = 1], 2
+- *(2) HashAggregate(keys=[window#13-T600000ms, word#4], functions=[merge_count(1)], output=[window#13-T600000ms, word#4, count#23L])
+- Exchange hashpartitioning(window#13-T600000ms, word#4, 1)
+- *(1) HashAggregate(keys=[window#13-T600000ms, word#4], functions=[partial_count(1)], output=[window#13-T600000ms, word#4, count#23L])
+- *(1) Project [window#13-T600000ms, word#4]
+- *(1) Filter (((isnotnull(timestamp#5-T600000ms) && isnotnull(window#13-T600000ms)) && (timestamp#5-T600000ms >= window#13-T600000ms.start)) && (timestamp#5-T600000ms < window#13-T600000ms.end))
+- *(1) Expand [List(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast( [...]
+- EventTimeWatermark timestamp#5: timestamp, interval 10 minutes
+- LocalTableScan <empty>, [word#4, timestamp#5]
```
### Why are the changes needed?
If we write the code according to the documentation.It does not clean up old state.May cause OOM
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
```java
SparkSession spark = SparkSession.builder().appName("test").master("local[*]")
.config("spark.sql.shuffle.partitions", 1)
.getOrCreate();
Dataset<Row> lines = spark.readStream().format("socket")
.option("host", "skynet")
.option("includeTimestamp", true)
.option("port", 8888).load();
Dataset<Row> words = lines.toDF("word", "timestamp");
Dataset<Row> windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window(col("timestamp"), "10 minutes", "5 minutes"),
col("word"))
.count();
StreamingQuery start = windowedCounts.writeStream()
.outputMode("update")
.format("console").start();
start.awaitTermination();
```
We can write an example like this.And input some date
1. see the matrics `stateOnCurrentVersionSizeBytes` in log.Is it increasing all the time?
2. see the Physical Plan.Whether it contains things like `HashAggregate(keys=[window#11-T10000ms, value#39]`
3. We can debug in `storeManager.remove(store, keyRow)`.Whether it will remove the old state.
Closes #27268 from bettermouse/spark-30553.
Authored-by: bettermouse <qq5375631>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
(cherry picked from commit 3c4e61918fc8266368bd33ea0612144de77571e6)
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
docs/structured-streaming-programming-guide.md | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index abff126c..3f77f89 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -957,8 +957,8 @@ Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp
Dataset<Row> windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
- functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
- words.col("word"))
+ window(col("timestamp"), "10 minutes", "5 minutes"),
+ col("word"))
.count();
{% endhighlight %}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org