You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tdas <gi...@git.apache.org> on 2018/01/13 02:50:52 UTC

[GitHub] spark pull request #20255: [SPARK-23064][DOCS][SS] Added documentation for s...

GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/20255

    [SPARK-23064][DOCS][SS] Added documentation for stream-stream joins

    ## What changes were proposed in this pull request?
    Added documentation for stream-stream joins
    ![image](https://user-images.githubusercontent.com/663212/34902103-6ad55434-f7c9-11e7-9011-3bdedb46c4aa.png)
    ![image](https://user-images.githubusercontent.com/663212/34902106-75793cde-f7c9-11e7-8dbc-95236ffc8b77.png)
    ![image](https://user-images.githubusercontent.com/663212/34902108-7ebe5e8c-f7c9-11e7-901d-ae9a6665c8aa.png)
    
    ## How was this patch tested?
    
    N/a

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark join-docs

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20255.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20255
    
----
commit 1335a6d3c00ccc04e7a2943ce041994150e8f9db
Author: Tathagata Das <ta...@...>
Date:   2018-01-13T02:48:30Z

    Join docs

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    **[Test build #86214 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86214/testReport)** for PR 20255 at commit [`b8381ef`](https://github.com/apache/spark/commit/b8381ef4bbd05de718f8097d136ed332de136346).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86215/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20255: [SPARK-23064][DOCS][SS] Added documentation for s...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20255#discussion_r161925337
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -2142,6 +2452,7 @@ write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "pat
     
     **Talks**
     
    -- Spark Summit 2017 Talk - [Easy, Scalable, Fault-tolerant Stream Processing with Structured Streaming in Apache Spark](https://spark-summit.org/2017/events/easy-scalable-fault-tolerant-stream-processing-with-structured-streaming-in-apache-spark/)
    +- Spark Summit Europe 2017 Talks -
    +  - [Easy, Scalable, Fault-tolerant Stream Processing with Structured Streaming in Apache Spark](https://spark-summit.org/2017/events/easy-scalable-fault-tolerant-stream-processing-with-structured-streaming-in-apache-spark/)
    --- End diff --
    
    TODO: this link needs to be updated. blocked on some links not working on the spark summit website.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86073/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    **[Test build #86219 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86219/testReport)** for PR 20255 at commit [`68f30d0`](https://github.com/apache/spark/commit/68f30d02a7b66a56ba438f9a21e222149509d174).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20255: [SPARK-23064][DOCS][SS] Added documentation for s...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20255#discussion_r161920813
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1089,6 +1098,224 @@ streamingDf.join(staticDf, "type", "right_join")  # right outer join with a stat
     </div>
     </div>
     
    +Note that stream-static joins are not stateful, so no state management is necessary.
    +However, a few types of stream-static outer join are not supported as the incomplete view of
    +all data in a stream makes it infeasible to calculate the results correctly.
    +These are discussed at the end of this section.
    +
    +#### Stream-stream Joins
    +In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming
    +Datasets/DataFrames. The challenge of generating join results between two data streams is that,
    +at any point of time, the view of the dataset is incomplete for both sides of the join making
    +it much harder to find matches between inputs. Any row received from one input stream can match
    +with any future, yet-to-be-received row from the other input stream. Hence, for both the input
    +streams, we buffer past input as streaming state, so that we can match every future input with
    +past input and accordingly generate joined results. Furthermore, similar to streaming aggregations,
    +we automatically handle late, out-of-order data and can limit the state using watermarks.
    +Let’s discuss the different types of supported stream-stream joins and how to use them.
    +
    +##### Inner Joins with optional Watermarking
    +Inner joins on any kind of columns along with any kind of join conditions are supported.
    +However, as the stream runs, the size of streaming state will keep growing indefinitely as
    +*all* past input must be saved as the any new input can match with any input from the past.
    +To avoid unbounded state, you have to define additional join conditions such that indefinitely
    +old inputs cannot match with future inputs and therefore can be cleared from the state.
    +In other words, you will have to do the following additional steps in the join.
    +
    +1. Define watermark delays on both inputs such that the engine knows how delayed the input can be
    +(similar to streaming aggregations)
    +
    +1. Define a constraint on event-time across the two inputs such that the engine can figure out when
    +old rows of one input is not going to be required for matches with the other input. This constraint
    +can either be a time range condition (e.g. `...JOIN ON leftTime BETWEN rightTime AND rightTime + INTERVAL 1 HOUR`),
    +or equi-join on event-time windows (e.g. `...JOIN ON leftTimeWindow = rightTimeWindow`).
    +Let’s understand this with an example.
    +
    +Let’s say we want to join a stream of advertisement impressions (when an ad was shown) with
    +another stream of user clicks on advertisements to correlate when impressions led to
    +monetizable clicks. To allow the state cleanup in this stream-stream join, you will have to
    +specify the watermarking delays and the time constraints as follows.
    +
    +1. Watermark delays: Say, the impressions and the corresponding clicks can be late/out-of-order
    +in event-time by at most 2 and 3 hours, respectively.
    +
    +1. Event-time range condition: Say, a click can occur within a time range of 0 seconds to 1 hour
    +after the corresponding impression.
    +
    +The code would look like this.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +import org.apache.spark.sql.functions.expr
    +
    +val impressions = spark.readStream. ...
    +val clicks = spark.readStream. ...
    +
    +// Apply watermarks on event-time columns
    +val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
    +val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    +
    +// Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """
    +  ))
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +import static org.apache.spark.sql.functions.expr
    +
    +Dataset<Row> impressions = spark.readStream(). ...
    +Dataset<Row> clicks = spark.readStream(). ...
    +
    +// Apply watermarks on event-time columns
    +Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
    +Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");
    +
    +// Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr(
    +    "clickAdId = impressionAdId AND " +
    +    "clickTime >= impressionTime AND " +
    +    "clickTime <= impressionTime + interval 1 hour "
    +  ));
    +
    +{% endhighlight %}
    +
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +from pyspark.sql.functions import expr
    +
    +impressions = spark.readStream. ...
    +clicks = spark.readStream. ...
    +
    +# Apply watermarks on event-time columns
    +impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
    +clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    +
    +# Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """
    +  ))
    --- End diff --
    
    Thank you!



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    **[Test build #86214 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86214/testReport)** for PR 20255 at commit [`b8381ef`](https://github.com/apache/spark/commit/b8381ef4bbd05de718f8097d136ed332de136346).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    **[Test build #86303 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86303/testReport)** for PR 20255 at commit [`e39b0a6`](https://github.com/apache/spark/commit/e39b0a63010e5ac1372966c2d7b8fc8eefa821c3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    **[Test build #86219 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86219/testReport)** for PR 20255 at commit [`68f30d0`](https://github.com/apache/spark/commit/68f30d02a7b66a56ba438f9a21e222149509d174).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20255: [SPARK-23064][DOCS][SS] Added documentation for s...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20255#discussion_r162261740
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1089,6 +1098,224 @@ streamingDf.join(staticDf, "type", "right_join")  # right outer join with a stat
     </div>
     </div>
     
    +Note that stream-static joins are not stateful, so no state management is necessary.
    +However, a few types of stream-static outer join are not supported as the incomplete view of
    +all data in a stream makes it infeasible to calculate the results correctly.
    +These are discussed at the end of this section.
    +
    +#### Stream-stream Joins
    +In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming
    +Datasets/DataFrames. The challenge of generating join results between two data streams is that,
    +at any point of time, the view of the dataset is incomplete for both sides of the join making
    +it much harder to find matches between inputs. Any row received from one input stream can match
    +with any future, yet-to-be-received row from the other input stream. Hence, for both the input
    +streams, we buffer past input as streaming state, so that we can match every future input with
    +past input and accordingly generate joined results. Furthermore, similar to streaming aggregations,
    +we automatically handle late, out-of-order data and can limit the state using watermarks.
    +Let’s discuss the different types of supported stream-stream joins and how to use them.
    +
    +##### Inner Joins with optional Watermarking
    +Inner joins on any kind of columns along with any kind of join conditions are supported.
    +However, as the stream runs, the size of streaming state will keep growing indefinitely as
    +*all* past input must be saved as the any new input can match with any input from the past.
    +To avoid unbounded state, you have to define additional join conditions such that indefinitely
    +old inputs cannot match with future inputs and therefore can be cleared from the state.
    +In other words, you will have to do the following additional steps in the join.
    +
    +1. Define watermark delays on both inputs such that the engine knows how delayed the input can be
    +(similar to streaming aggregations)
    +
    +1. Define a constraint on event-time across the two inputs such that the engine can figure out when
    +old rows of one input is not going to be required for matches with the other input. This constraint
    +can either be a time range condition (e.g. `...JOIN ON leftTime BETWEN rightTime AND rightTime + INTERVAL 1 HOUR`),
    +or equi-join on event-time windows (e.g. `...JOIN ON leftTimeWindow = rightTimeWindow`).
    +Let’s understand this with an example.
    +
    +Let’s say we want to join a stream of advertisement impressions (when an ad was shown) with
    +another stream of user clicks on advertisements to correlate when impressions led to
    +monetizable clicks. To allow the state cleanup in this stream-stream join, you will have to
    +specify the watermarking delays and the time constraints as follows.
    +
    +1. Watermark delays: Say, the impressions and the corresponding clicks can be late/out-of-order
    +in event-time by at most 2 and 3 hours, respectively.
    +
    +1. Event-time range condition: Say, a click can occur within a time range of 0 seconds to 1 hour
    +after the corresponding impression.
    +
    +The code would look like this.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +import org.apache.spark.sql.functions.expr
    +
    +val impressions = spark.readStream. ...
    +val clicks = spark.readStream. ...
    +
    +// Apply watermarks on event-time columns
    +val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
    +val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    +
    +// Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """
    +  ))
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +import static org.apache.spark.sql.functions.expr
    +
    +Dataset<Row> impressions = spark.readStream(). ...
    +Dataset<Row> clicks = spark.readStream(). ...
    +
    +// Apply watermarks on event-time columns
    +Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
    +Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");
    +
    +// Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr(
    +    "clickAdId = impressionAdId AND " +
    +    "clickTime >= impressionTime AND " +
    +    "clickTime <= impressionTime + interval 1 hour "
    +  ));
    +
    +{% endhighlight %}
    +
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +from pyspark.sql.functions import expr
    +
    +impressions = spark.readStream. ...
    +clicks = spark.readStream. ...
    +
    +# Apply watermarks on event-time columns
    +impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
    +clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    +
    +# Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """
    +  ))
    --- End diff --
    
    sure!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20255: [SPARK-23064][DOCS][SS] Added documentation for s...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20255#discussion_r161366935
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1089,6 +1098,224 @@ streamingDf.join(staticDf, "type", "right_join")  # right outer join with a stat
     </div>
     </div>
     
    +Note that stream-static joins are not stateful, so no state management is necessary.
    +However, a few types of stream-static outer join are not supported as the incomplete view of
    +all data in a stream makes it infeasible to calculate the results correctly.
    +These are discussed at the end of this section.
    +
    +#### Stream-stream Joins
    +In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming
    +Datasets/DataFrames. The challenge of generating join results between two data streams is that,
    +at any point of time, the view of the dataset is incomplete for both sides of the join making
    +it much harder to find matches between inputs. Any row received from one input stream can match
    +with any future, yet-to-be-received row from the other input stream. Hence, for both the input
    +streams, we buffer past input as streaming state, so that we can match every future input with
    +past input and accordingly generate joined results. Furthermore, similar to streaming aggregations,
    +we automatically handle late, out-of-order data and can limit the state using watermarks.
    +Let’s discuss the different types of supported stream-stream joins and how to use them.
    +
    +##### Inner Joins with optional Watermarking
    +Inner joins on any kind of columns along with any kind of join conditions are supported.
    +However, as the stream runs, the size of streaming state will keep growing indefinitely as
    +*all* past input must be saved as the any new input can match with any input from the past.
    +To avoid unbounded state, you have to define additional join conditions such that indefinitely
    +old inputs cannot match with future inputs and therefore can be cleared from the state.
    +In other words, you will have to do the following additional steps in the join.
    +
    +1. Define watermark delays on both inputs such that the engine knows how delayed the input can be
    +(similar to streaming aggregations)
    +
    +1. Define a constraint on event-time across the two inputs such that the engine can figure out when
    +old rows of one input is not going to be required for matches with the other input. This constraint
    +can either be a time range condition (e.g. `...JOIN ON leftTime BETWEN rightTime AND rightTime + INTERVAL 1 HOUR`),
    +or equi-join on event-time windows (e.g. `...JOIN ON leftTimeWindow = rightTimeWindow`).
    +Let’s understand this with an example.
    +
    +Let’s say we want to join a stream of advertisement impressions (when an ad was shown) with
    +another stream of user clicks on advertisements to correlate when impressions led to
    +monetizable clicks. To allow the state cleanup in this stream-stream join, you will have to
    +specify the watermarking delays and the time constraints as follows.
    +
    +1. Watermark delays: Say, the impressions and the corresponding clicks can be late/out-of-order
    +in event-time by at most 2 and 3 hours, respectively.
    +
    +1. Event-time range condition: Say, a click can occur within a time range of 0 seconds to 1 hour
    +after the corresponding impression.
    +
    +The code would look like this.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +import org.apache.spark.sql.functions.expr
    +
    +val impressions = spark.readStream. ...
    +val clicks = spark.readStream. ...
    +
    +// Apply watermarks on event-time columns
    +val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
    +val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    +
    +// Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """
    +  ))
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +import static org.apache.spark.sql.functions.expr
    +
    +Dataset<Row> impressions = spark.readStream(). ...
    +Dataset<Row> clicks = spark.readStream(). ...
    +
    +// Apply watermarks on event-time columns
    +Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
    +Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");
    +
    +// Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr(
    +    "clickAdId = impressionAdId AND " +
    +    "clickTime >= impressionTime AND " +
    +    "clickTime <= impressionTime + interval 1 hour "
    +  ));
    +
    +{% endhighlight %}
    +
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +from pyspark.sql.functions import expr
    +
    +impressions = spark.readStream. ...
    +clicks = spark.readStream. ...
    +
    +# Apply watermarks on event-time columns
    +impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
    +clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    +
    +# Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """
    +  ))
    --- End diff --
    
    this *should* just work for R, like this:
    
    (I added withWatermark in 2.3)
    
    ```
    impressions <- read.stream( ...
    clicks <- read.stream( ...
    
    # Apply watermarks on event-time columns
    impressionsWithWatermark <- withWatermark(impressions, "impressionTime", "2 hours")
    clicksWithWatermark <- withWatermark(clicks, "clickTime", "3 hours")
    
    # Join with event-time constraints
    impressionsWithWatermark.join(
       clicksWithWatermark,
       expr(
         "clickAdId = impressionAdId AND
           clickTime >= impressionTime AND
           clickTime <= impressionTime + interval 1 hour"
    ))
    ```



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20255: [SPARK-23064][DOCS][SS] Added documentation for s...

Posted by felixcheung <gi...@git.apache.org>.
Github user felixcheung commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20255#discussion_r161366948
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1089,6 +1098,224 @@ streamingDf.join(staticDf, "type", "right_join")  # right outer join with a stat
     </div>
     </div>
     
    +Note that stream-static joins are not stateful, so no state management is necessary.
    +However, a few types of stream-static outer join are not supported as the incomplete view of
    +all data in a stream makes it infeasible to calculate the results correctly.
    +These are discussed at the end of this section.
    +
    +#### Stream-stream Joins
    +In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming
    +Datasets/DataFrames. The challenge of generating join results between two data streams is that,
    +at any point of time, the view of the dataset is incomplete for both sides of the join making
    +it much harder to find matches between inputs. Any row received from one input stream can match
    +with any future, yet-to-be-received row from the other input stream. Hence, for both the input
    +streams, we buffer past input as streaming state, so that we can match every future input with
    +past input and accordingly generate joined results. Furthermore, similar to streaming aggregations,
    +we automatically handle late, out-of-order data and can limit the state using watermarks.
    +Let’s discuss the different types of supported stream-stream joins and how to use them.
    +
    +##### Inner Joins with optional Watermarking
    +Inner joins on any kind of columns along with any kind of join conditions are supported.
    +However, as the stream runs, the size of streaming state will keep growing indefinitely as
    +*all* past input must be saved as the any new input can match with any input from the past.
    +To avoid unbounded state, you have to define additional join conditions such that indefinitely
    +old inputs cannot match with future inputs and therefore can be cleared from the state.
    +In other words, you will have to do the following additional steps in the join.
    +
    +1. Define watermark delays on both inputs such that the engine knows how delayed the input can be
    +(similar to streaming aggregations)
    +
    +1. Define a constraint on event-time across the two inputs such that the engine can figure out when
    +old rows of one input is not going to be required for matches with the other input. This constraint
    +can either be a time range condition (e.g. `...JOIN ON leftTime BETWEN rightTime AND rightTime + INTERVAL 1 HOUR`),
    +or equi-join on event-time windows (e.g. `...JOIN ON leftTimeWindow = rightTimeWindow`).
    +Let’s understand this with an example.
    +
    +Let’s say we want to join a stream of advertisement impressions (when an ad was shown) with
    +another stream of user clicks on advertisements to correlate when impressions led to
    +monetizable clicks. To allow the state cleanup in this stream-stream join, you will have to
    +specify the watermarking delays and the time constraints as follows.
    +
    +1. Watermark delays: Say, the impressions and the corresponding clicks can be late/out-of-order
    +in event-time by at most 2 and 3 hours, respectively.
    +
    +1. Event-time range condition: Say, a click can occur within a time range of 0 seconds to 1 hour
    +after the corresponding impression.
    +
    +The code would look like this.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +import org.apache.spark.sql.functions.expr
    +
    +val impressions = spark.readStream. ...
    +val clicks = spark.readStream. ...
    +
    +// Apply watermarks on event-time columns
    +val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
    +val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    +
    +// Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """
    +  ))
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +import static org.apache.spark.sql.functions.expr
    +
    +Dataset<Row> impressions = spark.readStream(). ...
    +Dataset<Row> clicks = spark.readStream(). ...
    +
    +// Apply watermarks on event-time columns
    +Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
    +Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");
    +
    +// Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr(
    +    "clickAdId = impressionAdId AND " +
    +    "clickTime >= impressionTime AND " +
    +    "clickTime <= impressionTime + interval 1 hour "
    +  ));
    +
    +{% endhighlight %}
    +
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +from pyspark.sql.functions import expr
    +
    +impressions = spark.readStream. ...
    +clicks = spark.readStream. ...
    +
    +# Apply watermarks on event-time columns
    +impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
    +clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    +
    +# Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """
    +  ))
    +
    +{% endhighlight %}
    +
    +</div>
    +</div>
    +
    +##### Outer Joins with Watermarking
    +While the watermark + event-time constraints is optional for inner joins, for left and right outer
    +joins they must be specified. This is because for generating the NULL results in outer join, the
    +engine must know when an input row is not going to match with anything in future. Hence, the
    +watermark + event-time constraints must be specified for generating correct results.
    +
    +However, note that the outer NULL results will be generated with a delay (depends on the specified
    +watermark delay and the time range condition) because the engine has to wait for that long to ensure
    + there were no matches and there will be no more matches in future.
    --- End diff --
    
    extra space?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    **[Test build #86215 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86215/testReport)** for PR 20255 at commit [`0af12a3`](https://github.com/apache/spark/commit/0af12a3e63d9394ae9b86f7757d4d85828b4ce0b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86219/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    **[Test build #86073 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86073/testReport)** for PR 20255 at commit [`1335a6d`](https://github.com/apache/spark/commit/1335a6d3c00ccc04e7a2943ce041994150e8f9db).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20255: [SPARK-23064][DOCS][SS] Added documentation for s...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/20255


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    Merging to master and 2.3.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    **[Test build #86215 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86215/testReport)** for PR 20255 at commit [`0af12a3`](https://github.com/apache/spark/commit/0af12a3e63d9394ae9b86f7757d4d85828b4ce0b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20255: [SPARK-23064][DOCS][SS] Added documentation for s...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20255#discussion_r162212307
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1089,6 +1101,300 @@ streamingDf.join(staticDf, "type", "right_join")  # right outer join with a stat
     </div>
     </div>
     
    +Note that stream-static joins are not stateful, so no state management is necessary.
    +However, a few types of stream-static outer joins are not yet supported.
    +These are listed at the [end of this Join section](#support-matrix-for-joins-in-streaming-queries).
    +
    +#### Stream-stream Joins
    +In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming
    +Datasets/DataFrames. The challenge of generating join results between two data streams is that,
    +at any point of time, the view of the dataset is incomplete for both sides of the join making
    +it much harder to find matches between inputs. Any row received from one input stream can match
    +with any future, yet-to-be-received row from the other input stream. Hence, for both the input
    +streams, we buffer past input as streaming state, so that we can match every future input with
    +past input and accordingly generate joined results. Furthermore, similar to streaming aggregations,
    +we automatically handle late, out-of-order data and can limit the state using watermarks.
    +Let’s discuss the different types of supported stream-stream joins and how to use them.
    +
    +##### Inner Joins with optional Watermarking
    +Inner joins on any kind of columns along with any kind of join conditions are supported.
    +However, as the stream runs, the size of streaming state will keep growing indefinitely as
    +*all* past input must be saved as the any new input can match with any input from the past.
    +To avoid unbounded state, you have to define additional join conditions such that indefinitely
    +old inputs cannot match with future inputs and therefore can be cleared from the state.
    +In other words, you will have to do the following additional steps in the join.
    +
    +1. Define watermark delays on both inputs such that the engine knows how delayed the input can be
    +(similar to streaming aggregations)
    +
    +1. Define a constraint on event-time across the two inputs such that the engine can figure out when
    +old rows of one input is not going to be required (i.e. will not satisfy the time constraint) for
    +matches with the other input. This constraint can be defined in one of the two ways.
    +
    +    1. Time range join conditions (e.g. `...JOIN ON leftTime BETWEN rightTime AND rightTime + INTERVAL 1 HOUR`),
    +
    +    1. Join on event-time windows (e.g. `...JOIN ON leftTimeWindow = rightTimeWindow`).
    +
    +Let’s understand this with an example.
    +
    +Let’s say we want to join a stream of advertisement impressions (when an ad was shown) with
    +another stream of user clicks on advertisements to correlate when impressions led to
    +monetizable clicks. To allow the state cleanup in this stream-stream join, you will have to
    +specify the watermarking delays and the time constraints as follows.
    +
    +1. Watermark delays: Say, the impressions and the corresponding clicks can be late/out-of-order
    +in event-time by at most 2 and 3 hours, respectively.
    +
    +1. Event-time range condition: Say, a click can occur within a time range of 0 seconds to 1 hour
    +after the corresponding impression.
    +
    +The code would look like this.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +import org.apache.spark.sql.functions.expr
    +
    +val impressions = spark.readStream. ...
    +val clicks = spark.readStream. ...
    +
    +// Apply watermarks on event-time columns
    +val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
    +val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    +
    +// Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """)
    +)
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +import static org.apache.spark.sql.functions.expr
    +
    +Dataset<Row> impressions = spark.readStream(). ...
    +Dataset<Row> clicks = spark.readStream(). ...
    +
    +// Apply watermarks on event-time columns
    +Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
    +Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");
    +
    +// Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr(
    +    "clickAdId = impressionAdId AND " +
    +    "clickTime >= impressionTime AND " +
    +    "clickTime <= impressionTime + interval 1 hour ")
    +);
    +
    +{% endhighlight %}
    +
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +from pyspark.sql.functions import expr
    +
    +impressions = spark.readStream. ...
    +clicks = spark.readStream. ...
    +
    +# Apply watermarks on event-time columns
    +impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
    +clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    +
    +# Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """)
    +)
    +
    +{% endhighlight %}
    +
    +</div>
    +</div>
    +
    +##### Outer Joins with Watermarking
    +While the watermark + event-time constraints is optional for inner joins, for left and right outer
    +joins they must be specified. This is because for generating the NULL results in outer join, the
    +engine must know when an input row is not going to match with anything in future. Hence, the
    +watermark + event-time constraints must be specified for generating correct results. Therefore,
    +a query with outer-join will look quite like the ad-monetization example earlier, except that
    +there will be an additional parameter specifying it to be an outer-join.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """),
    +  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter"
    + )
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr(
    +    "clickAdId = impressionAdId AND " +
    +    "clickTime >= impressionTime AND " +
    +    "clickTime <= impressionTime + interval 1 hour "),
    +  "leftOuter"                 // can be "inner", "leftOuter", "rightOuter"
    +);
    +
    +{% endhighlight %}
    +
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """),
    +  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter"
    +)
    +
    +{% endhighlight %}
    +
    +</div>
    +</div>
    +
    +However, note that the outer NULL results will be generated with a delay (depends on the specified
    +watermark delay and the time range condition) because the engine has to wait for that long to ensure
    +there were no matches and there will be no more matches in future.
    +
    +##### Support matrix for joins in streaming queries
    +
    +<table class ="table">
    +  <tr>
    +    <th>Left Input</th>
    +    <th>Right Input</th>
    +    <th>Join Type</th>
    +    <th></th>
    +  </tr>
    +  <tr>
    +      <td style="vertical-align: middle;">Static</td>
    +      <td style="vertical-align: middle;">Static</td>
    +      <td style="vertical-align: middle;">All types</td>
    +      <td style="vertical-align: middle;">
    +        Supported, since its not on streaming data even though it
    +        can be present in a streaming query
    +      </td>
    +  </tr>
    +  <tr>
    +    <td rowspan="4" style="vertical-align: middle;">Stream</td>
    +    <td rowspan="4" style="vertical-align: middle;">Static</td>
    +    <td style="vertical-align: middle;">Inner</td>
    +    <td style="vertical-align: middle;">Supported, not stateful</td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Left Outer</td>
    +    <td style="vertical-align: middle;">Supported, not stateful</td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Right Outer</td>
    +    <td style="vertical-align: middle;">Not supported</td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Full Outer</td>
    +    <td style="vertical-align: middle;">Not supported</td>
    +  </tr>
    +  <tr>
    +    <td rowspan="4" style="vertical-align: middle;">Static</td>
    +    <td rowspan="4" style="vertical-align: middle;">Stream</td>
    +    <td style="vertical-align: middle;">Inner</td>
    +    <td style="vertical-align: middle;">Supported, not stateful</td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Left Outer</td>
    +    <td style="vertical-align: middle;">Not supported</td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Right Outer</td>
    +    <td style="vertical-align: middle;">Supported, not stateful</td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Full Outer</td>
    +    <td style="vertical-align: middle;">Not supported</td>
    +  </tr>
    +  <tr>
    +    <td rowspan="4" style="vertical-align: middle;">Stream</td>
    +    <td rowspan="4" style="vertical-align: middle;">Stream</td>
    +    <td style="vertical-align: middle;">Inner</td>
    +    <td style="vertical-align: middle;">
    +      Supported, optionally specify watermark on both sides +
    +      time constraints for state cleanup<
    --- End diff --
    
    nit" remove `<`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86214/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20255: [SPARK-23064][DOCS][SS] Added documentation for s...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20255#discussion_r161920893
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1089,6 +1098,224 @@ streamingDf.join(staticDf, "type", "right_join")  # right outer join with a stat
     </div>
     </div>
     
    +Note that stream-static joins are not stateful, so no state management is necessary.
    +However, a few types of stream-static outer join are not supported as the incomplete view of
    +all data in a stream makes it infeasible to calculate the results correctly.
    +These are discussed at the end of this section.
    +
    +#### Stream-stream Joins
    +In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming
    +Datasets/DataFrames. The challenge of generating join results between two data streams is that,
    +at any point of time, the view of the dataset is incomplete for both sides of the join making
    +it much harder to find matches between inputs. Any row received from one input stream can match
    +with any future, yet-to-be-received row from the other input stream. Hence, for both the input
    +streams, we buffer past input as streaming state, so that we can match every future input with
    +past input and accordingly generate joined results. Furthermore, similar to streaming aggregations,
    +we automatically handle late, out-of-order data and can limit the state using watermarks.
    +Let’s discuss the different types of supported stream-stream joins and how to use them.
    +
    +##### Inner Joins with optional Watermarking
    +Inner joins on any kind of columns along with any kind of join conditions are supported.
    +However, as the stream runs, the size of streaming state will keep growing indefinitely as
    +*all* past input must be saved as the any new input can match with any input from the past.
    +To avoid unbounded state, you have to define additional join conditions such that indefinitely
    +old inputs cannot match with future inputs and therefore can be cleared from the state.
    +In other words, you will have to do the following additional steps in the join.
    +
    +1. Define watermark delays on both inputs such that the engine knows how delayed the input can be
    +(similar to streaming aggregations)
    +
    +1. Define a constraint on event-time across the two inputs such that the engine can figure out when
    +old rows of one input is not going to be required for matches with the other input. This constraint
    +can either be a time range condition (e.g. `...JOIN ON leftTime BETWEN rightTime AND rightTime + INTERVAL 1 HOUR`),
    +or equi-join on event-time windows (e.g. `...JOIN ON leftTimeWindow = rightTimeWindow`).
    +Let’s understand this with an example.
    +
    +Let’s say we want to join a stream of advertisement impressions (when an ad was shown) with
    +another stream of user clicks on advertisements to correlate when impressions led to
    +monetizable clicks. To allow the state cleanup in this stream-stream join, you will have to
    +specify the watermarking delays and the time constraints as follows.
    +
    +1. Watermark delays: Say, the impressions and the corresponding clicks can be late/out-of-order
    +in event-time by at most 2 and 3 hours, respectively.
    +
    +1. Event-time range condition: Say, a click can occur within a time range of 0 seconds to 1 hour
    +after the corresponding impression.
    +
    +The code would look like this.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +import org.apache.spark.sql.functions.expr
    +
    +val impressions = spark.readStream. ...
    +val clicks = spark.readStream. ...
    +
    +// Apply watermarks on event-time columns
    +val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
    +val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    +
    +// Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """
    +  ))
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +import static org.apache.spark.sql.functions.expr
    +
    +Dataset<Row> impressions = spark.readStream(). ...
    +Dataset<Row> clicks = spark.readStream(). ...
    +
    +// Apply watermarks on event-time columns
    +Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
    +Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");
    +
    +// Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr(
    +    "clickAdId = impressionAdId AND " +
    +    "clickTime >= impressionTime AND " +
    +    "clickTime <= impressionTime + interval 1 hour "
    +  ));
    +
    +{% endhighlight %}
    +
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +from pyspark.sql.functions import expr
    +
    +impressions = spark.readStream. ...
    +clicks = spark.readStream. ...
    +
    +# Apply watermarks on event-time columns
    +impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
    +clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    +
    +# Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """
    +  ))
    --- End diff --
    
    Could you add tests for joins in R as well? :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20255: [SPARK-23064][DOCS][SS] Added documentation for s...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20255#discussion_r162199330
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1089,6 +1101,300 @@ streamingDf.join(staticDf, "type", "right_join")  # right outer join with a stat
     </div>
     </div>
     
    +Note that stream-static joins are not stateful, so no state management is necessary.
    +However, a few types of stream-static outer joins are not yet supported.
    +These are listed at the [end of this Join section](#support-matrix-for-joins-in-streaming-queries).
    +
    +#### Stream-stream Joins
    +In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming
    +Datasets/DataFrames. The challenge of generating join results between two data streams is that,
    +at any point of time, the view of the dataset is incomplete for both sides of the join making
    +it much harder to find matches between inputs. Any row received from one input stream can match
    +with any future, yet-to-be-received row from the other input stream. Hence, for both the input
    +streams, we buffer past input as streaming state, so that we can match every future input with
    +past input and accordingly generate joined results. Furthermore, similar to streaming aggregations,
    +we automatically handle late, out-of-order data and can limit the state using watermarks.
    +Let’s discuss the different types of supported stream-stream joins and how to use them.
    +
    +##### Inner Joins with optional Watermarking
    +Inner joins on any kind of columns along with any kind of join conditions are supported.
    +However, as the stream runs, the size of streaming state will keep growing indefinitely as
    +*all* past input must be saved as the any new input can match with any input from the past.
    +To avoid unbounded state, you have to define additional join conditions such that indefinitely
    +old inputs cannot match with future inputs and therefore can be cleared from the state.
    +In other words, you will have to do the following additional steps in the join.
    +
    +1. Define watermark delays on both inputs such that the engine knows how delayed the input can be
    +(similar to streaming aggregations)
    +
    +1. Define a constraint on event-time across the two inputs such that the engine can figure out when
    +old rows of one input is not going to be required (i.e. will not satisfy the time constraint) for
    +matches with the other input. This constraint can be defined in one of the two ways.
    +
    +    1. Time range join conditions (e.g. `...JOIN ON leftTime BETWEN rightTime AND rightTime + INTERVAL 1 HOUR`),
    +
    +    1. Join on event-time windows (e.g. `...JOIN ON leftTimeWindow = rightTimeWindow`).
    +
    +Let’s understand this with an example.
    +
    +Let’s say we want to join a stream of advertisement impressions (when an ad was shown) with
    +another stream of user clicks on advertisements to correlate when impressions led to
    +monetizable clicks. To allow the state cleanup in this stream-stream join, you will have to
    +specify the watermarking delays and the time constraints as follows.
    +
    +1. Watermark delays: Say, the impressions and the corresponding clicks can be late/out-of-order
    +in event-time by at most 2 and 3 hours, respectively.
    +
    +1. Event-time range condition: Say, a click can occur within a time range of 0 seconds to 1 hour
    +after the corresponding impression.
    +
    +The code would look like this.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +import org.apache.spark.sql.functions.expr
    +
    +val impressions = spark.readStream. ...
    +val clicks = spark.readStream. ...
    +
    +// Apply watermarks on event-time columns
    +val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
    +val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    +
    +// Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """)
    +)
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +import static org.apache.spark.sql.functions.expr
    +
    +Dataset<Row> impressions = spark.readStream(). ...
    +Dataset<Row> clicks = spark.readStream(). ...
    +
    +// Apply watermarks on event-time columns
    +Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
    +Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");
    +
    +// Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr(
    +    "clickAdId = impressionAdId AND " +
    +    "clickTime >= impressionTime AND " +
    +    "clickTime <= impressionTime + interval 1 hour ")
    +);
    +
    +{% endhighlight %}
    +
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +from pyspark.sql.functions import expr
    +
    +impressions = spark.readStream. ...
    +clicks = spark.readStream. ...
    +
    +# Apply watermarks on event-time columns
    +impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
    +clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    +
    +# Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """)
    +)
    +
    +{% endhighlight %}
    +
    +</div>
    +</div>
    +
    +##### Outer Joins with Watermarking
    +While the watermark + event-time constraints is optional for inner joins, for left and right outer
    +joins they must be specified. This is because for generating the NULL results in outer join, the
    +engine must know when an input row is not going to match with anything in future. Hence, the
    +watermark + event-time constraints must be specified for generating correct results. Therefore,
    +a query with outer-join will look quite like the ad-monetization example earlier, except that
    +there will be an additional parameter specifying it to be an outer-join.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """),
    +  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter"
    + )
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr(
    +    "clickAdId = impressionAdId AND " +
    +    "clickTime >= impressionTime AND " +
    +    "clickTime <= impressionTime + interval 1 hour "),
    +  "leftOuter"                 // can be "inner", "leftOuter", "rightOuter"
    +);
    +
    +{% endhighlight %}
    +
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """),
    +  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter"
    +)
    +
    +{% endhighlight %}
    +
    +</div>
    +</div>
    +
    +However, note that the outer NULL results will be generated with a delay (depends on the specified
    +watermark delay and the time range condition) because the engine has to wait for that long to ensure
    +there were no matches and there will be no more matches in future.
    +
    +##### Support matrix for joins in streaming queries
    +
    +<table class ="table">
    +  <tr>
    +    <th>Left Input</th>
    +    <th>Right Input</th>
    +    <th>Join Type</th>
    +    <th></th>
    +  </tr>
    +  <tr>
    +      <td style="vertical-align: middle;">Static</td>
    +      <td style="vertical-align: middle;">Static</td>
    +      <td style="vertical-align: middle;">All types</td>
    +      <td style="vertical-align: middle;">
    +        Supported, since its not on streaming data even though it
    +        can be present in a streaming query
    +      </td>
    +  </tr>
    +  <tr>
    +    <td rowspan="4" style="vertical-align: middle;">Stream</td>
    +    <td rowspan="4" style="vertical-align: middle;">Static</td>
    +    <td style="vertical-align: middle;">Inner</td>
    +    <td style="vertical-align: middle;">Supported, not stateful</td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Left Outer</td>
    +    <td style="vertical-align: middle;">Supported, not stateful</td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Right Outer</td>
    +    <td style="vertical-align: middle;">Not supported</td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Full Outer</td>
    +    <td style="vertical-align: middle;">Not supported</td>
    +  </tr>
    +  <tr>
    +    <td rowspan="4" style="vertical-align: middle;">Static</td>
    +    <td rowspan="4" style="vertical-align: middle;">Stream</td>
    +    <td style="vertical-align: middle;">Inner</td>
    +    <td style="vertical-align: middle;">Supported, not stateful</td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Left Outer</td>
    +    <td style="vertical-align: middle;">Not supported</td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Right Outer</td>
    +    <td style="vertical-align: middle;">Supported, not stateful</td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Full Outer</td>
    +    <td style="vertical-align: middle;">Not supported</td>
    +  </tr>
    +  <tr>
    +    <td rowspan="4" style="vertical-align: middle;">Stream</td>
    +    <td rowspan="4" style="vertical-align: middle;">Stream</td>
    +    <td style="vertical-align: middle;">Inner</td>
    +    <td style="vertical-align: middle;">
    +      Supported, optionally specify watermark on both sides +
    +      time constraints for state cleanup<
    +    </td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Left Outer</td>
    +    <td style="vertical-align: middle;">
    +      Conditionally supported, must specify watermark on right + time constraints for correct
    +      results, optionally specify watermark on left for all state cleanup
    +    </td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Right Outer</td>
    +    <td style="vertical-align: middle;">
    +      Conditionally supported, must specify watermark on left + time constraints for correct
    +      results, optionally specify watermark on right for all state cleanup
    +    </td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Full Outer</td>
    +    <td style="vertical-align: middle;">Not supported</td>
    +  </tr>
    + <tr>
    +    <td></td>
    +    <td></td>
    +    <td></td>
    +    <td></td>
    +  </tr>
    +</table>
    +
    +Additional details on supported joins:
    +
    +- Joins can be cascaded, that is, you can do `df1.join(df2, ...).join(df3, ...).join(df4, ....)`.
    +
    +- As of Spark 2.3, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.
    +
    +- As of Spark 2.3, you cannot use other non-map-like operations before joins. Here are a few examples of
    +  what cannot be used.
    +
    +  - Cannot use streaming aggregations before joins.
    +
    +  - Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode cannot before joins.
    --- End diff --
    
    nit: ~~cannot~~ before joins.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    **[Test build #86303 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86303/testReport)** for PR 20255 at commit [`e39b0a6`](https://github.com/apache/spark/commit/e39b0a63010e5ac1372966c2d7b8fc8eefa821c3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    **[Test build #86073 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86073/testReport)** for PR 20255 at commit [`1335a6d`](https://github.com/apache/spark/commit/1335a6d3c00ccc04e7a2943ce041994150e8f9db).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20255: [SPARK-23064][DOCS][SS] Added documentation for stream-s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20255
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86303/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org