You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "HeartSaVioR (via GitHub)" <gi...@apache.org> on 2023/02/28 07:56:52 UTC

[GitHub] [spark] HeartSaVioR opened a new pull request, #40215: [SPARK-42591][SS][DOCS] Add examples of unblocked workloads after SPARK-42376

HeartSaVioR opened a new pull request, #40215:
URL: https://github.com/apache/spark/pull/40215

   ### What changes were proposed in this pull request?
   
   This PR proposes to add examples of unblocked workloads after SPARK-42376, which unblocks stream-stream time interval join followed by stateful operator.
   
   ### Why are the changes needed?
   
   We'd like to remove the description of limitations which no longer exist, as well as provide some code examples so that users can get some sense how to use the functionality.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, documentation change.
   
   ### How was this patch tested?
   
   Created a page via SKIP_API=1 bundle exec jekyll serve --watch and confirmed.
   
   Screenshots:
   
   > Scala
   
   <img width="622" alt="스크린샷 2023-02-28 오후 4 43 39" src="https://user-images.githubusercontent.com/1317309/221788892-717ae695-0f24-4258-9eed-c2875c45a913.png">
   
   
   > Java
   
   <img width="621" alt="스크린샷 2023-02-28 오후 4 44 06" src="https://user-images.githubusercontent.com/1317309/221788918-70cf41d4-aaa2-457e-9f13-3a94f137e4c6.png">
   
   
   > Python
   
   <img width="617" alt="스크린샷 2023-02-28 오후 4 45 30" src="https://user-images.githubusercontent.com/1317309/221788940-0755b28d-36da-4408-a259-3bc692e3f2d1.png">
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #40215: [SPARK-42591][SS][DOCS] Add examples of unblocked workloads after SPARK-42376

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #40215: [SPARK-42591][SS][DOCS] Add examples of unblocked workloads after SPARK-42376
URL: https://github.com/apache/spark/pull/40215


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #40215: [SPARK-42591][SS][DOCS] Add examples of unblocked workloads after SPARK-42376

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #40215:
URL: https://github.com/apache/spark/pull/40215#issuecomment-1457585553

   cc. @viirya as well who may be interested with new feature in SS.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40215: [SPARK-42591][SS][DOCS] Add examples of unblocked workloads after SPARK-42376

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40215:
URL: https://github.com/apache/spark/pull/40215#discussion_r1127778137


##########
docs/structured-streaming-programming-guide.md:
##########
@@ -1848,12 +1848,137 @@ Additional details on supported joins:
 
 - As of Spark 2.4, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.
 
-- As of Spark 2.4, you cannot use other non-map-like operations before joins. Here are a few examples of
-  what cannot be used.
+- You cannot use mapGroupsWithState and flatMapGroupsWithState before and after joins.
 
-  - Cannot use streaming aggregations before joins.
+In append output mode, you can construct a query having non-map-like operations before/after join.

Review Comment:
   Sounds better. I'll list some operations here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #40215: [SPARK-42591][SS][DOCS] Add examples of unblocked workloads after SPARK-42376

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #40215:
URL: https://github.com/apache/spark/pull/40215#discussion_r1127779011


##########
docs/structured-streaming-programming-guide.md:
##########
@@ -1848,12 +1848,137 @@ Additional details on supported joins:
 
 - As of Spark 2.4, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.
 
-- As of Spark 2.4, you cannot use other non-map-like operations before joins. Here are a few examples of
-  what cannot be used.
+- You cannot use mapGroupsWithState and flatMapGroupsWithState before and after joins.
 
-  - Cannot use streaming aggregations before joins.
+In append output mode, you can construct a query having non-map-like operations before/after join.
 
-  - Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins.
+For example, here's an example of time window aggregation in both streams followed by stream-stream join with event time window:
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+
+val clicksWindow = clicksWithWatermark
+  .groupBy(window("clickTime", "1 hour"))
+  .count()
+
+val impressionsWindow = impressionsWithWatermark
+  .groupBy(window("impressionTime", "1 hour"))
+  .count()
+
+clicksWindow.join(impressionsWindow, "window", "inner")
+
+{% endhighlight %}
+
+</div>
+<div data-lang="java"  markdown="1">
+
+{% highlight java %}
+
+Dataset<Row> clicksWindow = clicksWithWatermark
+  .groupBy(functions.window(clicksWithWatermark.col("clickTime"), "1 hour"))
+  .count();
+
+Dataset<Row> impressionsWindow = impressionsWithWatermark
+  .groupBy(functions.window(impressionsWithWatermark.col("impressionTime"), "1 hour"))
+  .count();
+
+clicksWindow.join(impressionsWindow, "window", "inner");
+
+{% endhighlight %}
+
+
+</div>
+<div data-lang="python"  markdown="1">
+
+{% highlight python %}
+joined = impressionsWithWatermark.join(
+  clicksWithWatermark,
+  expr("""
+    clickAdId = impressionAdId AND
+    clickTime >= impressionTime AND
+    clickTime <= impressionTime + interval 1 hour
+  """),
+  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
+)
+
+joined.groupBy(
+  joined.clickAdId,
+  window(joined.clickTime, "1 hour")
+).count()

Review Comment:
   Looks like something is messed up. Nice finding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #40215: [SPARK-42591][SS][DOCS] Add examples of unblocked workloads after SPARK-42376

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #40215:
URL: https://github.com/apache/spark/pull/40215#issuecomment-1447730252

   This should be merged after #39931.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a diff in pull request #40215: [SPARK-42591][SS][DOCS] Add examples of unblocked workloads after SPARK-42376

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #40215:
URL: https://github.com/apache/spark/pull/40215#discussion_r1127412999


##########
docs/structured-streaming-programming-guide.md:
##########
@@ -1848,12 +1848,137 @@ Additional details on supported joins:
 
 - As of Spark 2.4, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.
 
-- As of Spark 2.4, you cannot use other non-map-like operations before joins. Here are a few examples of
-  what cannot be used.
+- You cannot use mapGroupsWithState and flatMapGroupsWithState before and after joins.
 
-  - Cannot use streaming aggregations before joins.
+In append output mode, you can construct a query having non-map-like operations before/after join.

Review Comment:
   `non-map-like operations` sounds not clear to me. Can you add some examples like:
   
   ```
   In append output mode, you can construct a query having non-map-like operations, e.g. ..., ..., ..., before/after join.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #40215: [SPARK-42591][SS][DOCS] Add examples of unblocked workloads after SPARK-42376

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #40215:
URL: https://github.com/apache/spark/pull/40215#issuecomment-1457584928

   cc. @zsxwing @rangadi @jerrypeng @anishshri-db @chaoqin-li1123 
   cc-ing folks who reviewed the code change PR. This PR is a doc change to show up what is being unblocked, like we did in https://github.com/apache/spark/pull/40188 for fixing broken late record filtering.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a diff in pull request #40215: [SPARK-42591][SS][DOCS] Add examples of unblocked workloads after SPARK-42376

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #40215:
URL: https://github.com/apache/spark/pull/40215#discussion_r1127438527


##########
docs/structured-streaming-programming-guide.md:
##########
@@ -1848,12 +1848,137 @@ Additional details on supported joins:
 
 - As of Spark 2.4, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.
 
-- As of Spark 2.4, you cannot use other non-map-like operations before joins. Here are a few examples of
-  what cannot be used.
+- You cannot use mapGroupsWithState and flatMapGroupsWithState before and after joins.
 
-  - Cannot use streaming aggregations before joins.
+In append output mode, you can construct a query having non-map-like operations before/after join.
 
-  - Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins.
+For example, here's an example of time window aggregation in both streams followed by stream-stream join with event time window:
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+
+val clicksWindow = clicksWithWatermark
+  .groupBy(window("clickTime", "1 hour"))
+  .count()
+
+val impressionsWindow = impressionsWithWatermark
+  .groupBy(window("impressionTime", "1 hour"))
+  .count()
+
+clicksWindow.join(impressionsWindow, "window", "inner")
+
+{% endhighlight %}
+
+</div>
+<div data-lang="java"  markdown="1">
+
+{% highlight java %}
+
+Dataset<Row> clicksWindow = clicksWithWatermark
+  .groupBy(functions.window(clicksWithWatermark.col("clickTime"), "1 hour"))
+  .count();
+
+Dataset<Row> impressionsWindow = impressionsWithWatermark
+  .groupBy(functions.window(impressionsWithWatermark.col("impressionTime"), "1 hour"))
+  .count();
+
+clicksWindow.join(impressionsWindow, "window", "inner");
+
+{% endhighlight %}
+
+
+</div>
+<div data-lang="python"  markdown="1">
+
+{% highlight python %}
+joined = impressionsWithWatermark.join(
+  clicksWithWatermark,
+  expr("""
+    clickAdId = impressionAdId AND
+    clickTime >= impressionTime AND
+    clickTime <= impressionTime + interval 1 hour
+  """),
+  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
+)
+
+joined.groupBy(
+  joined.clickAdId,
+  window(joined.clickTime, "1 hour")
+).count()

Review Comment:
   This python example seems duplicate to the bottom one (join followed by time window aggregation).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #40215: [SPARK-42591][SS][DOCS] Add examples of unblocked workloads after SPARK-42376

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #40215:
URL: https://github.com/apache/spark/pull/40215#issuecomment-1458815185

   Thanks for reviewing! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org