You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/01/22 05:39:30 UTC

[spark] branch branch-2.4 updated: [SPARK-30553][DOCS] fix structured-streaming java example error

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 0300d4b  [SPARK-30553][DOCS] fix structured-streaming java example error
0300d4b is described below

commit 0300d4bde7343c2e260430e4f470505e9e721ab0
Author: bettermouse <qq5375631>
AuthorDate: Tue Jan 21 21:37:21 2020 -0800

    [SPARK-30553][DOCS] fix structured-streaming java example error
    
    # What changes were proposed in this pull request?
    
    Fix structured-streaming java example error.
    ```java
    Dataset<Row> windowedCounts = words
        .withWatermark("timestamp", "10 minutes")
        .groupBy(
            functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
            words.col("word"))
        .count();
    ```
    It does not clean up old state.May cause OOM
    
    > Before the fix
    
    ```scala
    == Physical Plan ==
    WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter48e331f0
    +- *(4) HashAggregate(keys=[window#13, word#4], functions=[count(1)], output=[window#13, word#4, count#12L])
       +- StateStoreSave [window#13, word#4], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-91124080-0e20-41c0-9150-91735bdc22c0/state, runId = 5c425536-a3ae-4385-8167-5fa529e6760d, opId = 0, ver = 6, numPartitions = 1], Update, 1579530890886, 2
          +- *(3) HashAggregate(keys=[window#13, word#4], functions=[merge_count(1)], output=[window#13, word#4, count#23L])
             +- StateStoreRestore [window#13, word#4], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-91124080-0e20-41c0-9150-91735bdc22c0/state, runId = 5c425536-a3ae-4385-8167-5fa529e6760d, opId = 0, ver = 6, numPartitions = 1], 2
                +- *(2) HashAggregate(keys=[window#13, word#4], functions=[merge_count(1)], output=[window#13, word#4, count#23L])
                   +- Exchange hashpartitioning(window#13, word#4, 1)
                      +- *(1) HashAggregate(keys=[window#13, word#4], functions=[partial_count(1)], output=[window#13, word#4, count#23L])
                         +- *(1) Project [window#13, word#4]
                            +- *(1) Filter (((isnotnull(timestamp#5) && isnotnull(window#13)) && (timestamp#5 >= window#13.start)) && (timestamp#5 < window#13.end))
                               +- *(1) Expand [List(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast((precisetimestampconversion(ti [...]
                                  +- EventTimeWatermark timestamp#5: timestamp, interval 10 minutes
                                     +- LocalTableScan <empty>, [word#4, timestamp#5]
    ```
    
    > After the fix
    
    ```scala
    == Physical Plan ==
    WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter1df12a96
    +- *(4) HashAggregate(keys=[window#13-T600000ms, word#4], functions=[count(1)], output=[window#8-T600000ms, word#4, count#12L])
       +- StateStoreSave [window#13-T600000ms, word#4], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-95ac74cc-aca6-42eb-827d-7586aa69bcd3/state, runId = 91fa311d-d47e-4726-9d0a-f21ef268d9d0, opId = 0, ver = 4, numPartitions = 1], Update, 1579529975342, 2
          +- *(3) HashAggregate(keys=[window#13-T600000ms, word#4], functions=[merge_count(1)], output=[window#13-T600000ms, word#4, count#23L])
             +- StateStoreRestore [window#13-T600000ms, word#4], state info [ checkpoint = file:/C:/Users/chenhao/AppData/Local/Temp/temporary-95ac74cc-aca6-42eb-827d-7586aa69bcd3/state, runId = 91fa311d-d47e-4726-9d0a-f21ef268d9d0, opId = 0, ver = 4, numPartitions = 1], 2
                +- *(2) HashAggregate(keys=[window#13-T600000ms, word#4], functions=[merge_count(1)], output=[window#13-T600000ms, word#4, count#23L])
                   +- Exchange hashpartitioning(window#13-T600000ms, word#4, 1)
                      +- *(1) HashAggregate(keys=[window#13-T600000ms, word#4], functions=[partial_count(1)], output=[window#13-T600000ms, word#4, count#23L])
                         +- *(1) Project [window#13-T600000ms, word#4]
                            +- *(1) Filter (((isnotnull(timestamp#5-T600000ms) && isnotnull(window#13-T600000ms)) && (timestamp#5-T600000ms >= window#13-T600000ms.start)) && (timestamp#5-T600000ms < window#13-T600000ms.end))
                               +- *(1) Expand [List(named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) as double) = (cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) THEN (CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE CEIL((cast( [...]
                                  +- EventTimeWatermark timestamp#5: timestamp, interval 10 minutes
                                     +- LocalTableScan <empty>, [word#4, timestamp#5]
    ```
    
    ### Why are the changes needed?
    If we write the code according to the documentation.It does not clean up old state.May cause OOM
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    ```java
            SparkSession spark = SparkSession.builder().appName("test").master("local[*]")
                    .config("spark.sql.shuffle.partitions", 1)
                    .getOrCreate();
            Dataset<Row> lines = spark.readStream().format("socket")
                    .option("host", "skynet")
                    .option("includeTimestamp", true)
                    .option("port", 8888).load();
            Dataset<Row> words = lines.toDF("word", "timestamp");
            Dataset<Row> windowedCounts = words
                    .withWatermark("timestamp", "10 minutes")
                    .groupBy(
                            window(col("timestamp"), "10 minutes", "5 minutes"),
                            col("word"))
                    .count();
            StreamingQuery start = windowedCounts.writeStream()
                    .outputMode("update")
                    .format("console").start();
            start.awaitTermination();
    
    ```
    We can  write an example like this.And input some date
    1. see the matrics `stateOnCurrentVersionSizeBytes` in log.Is it increasing all the time?
    2. see the Physical Plan.Whether it contains things like `HashAggregate(keys=[window#11-T10000ms, value#39]`
    3. We can debug in `storeManager.remove(store, keyRow)`.Whether it will remove the old state.
    
    Closes #27268 from bettermouse/spark-30553.
    
    Authored-by: bettermouse <qq5375631>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 3c4e61918fc8266368bd33ea0612144de77571e6)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 docs/structured-streaming-programming-guide.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index abff126c..3f77f89 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -957,8 +957,8 @@ Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp
 Dataset<Row> windowedCounts = words
     .withWatermark("timestamp", "10 minutes")
     .groupBy(
-        functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
-        words.col("word"))
+        window(col("timestamp"), "10 minutes", "5 minutes"),
+        col("word"))
     .count();
 {% endhighlight %}
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org