You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/02/16 03:51:26 UTC

[GitHub] [spark] viirya opened a new pull request #31570: [NOT-MERGE][SPARK-10816][SS] SessionWindow support for Structure Streaming

viirya opened a new pull request #31570:
URL: https://github.com/apache/spark/pull/31570


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   This is going to sync up with master branch from #22583. Currently only for test purpose and to keep with latest master.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-803218591


   I did some performance tests I did before, and I observed outstanding difference between revised my PR (my PR + state format used here) vs this PR.
   
   > revised my PR (versioned as 3.2.0-SPARK-10816-heartsavior)
   
   https://github.com/HeartSaVioR/spark/tree/SPARK-10816-heartsavior-rebase-apply-PR-31570-versioned
   
   > this PR (versioned as 3.2.0-PR-31570)
   
   https://github.com/HeartSaVioR/spark/tree/PR-31570-versioned
   
   > benchmark code
   
   https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/tree/benchmarking-SPARK-10816
   
   I built the benchmark code against locally installed Spark artifacts for both (that said, I built the benchmark code per each).
   
   Simple, change built.sbt to update Spark version to the custom one, and run `sbt clean assembly`.
   
   > machine to run benchmark
   
   * AMD Ryzen 5600X (no overclock, 3.7 Ghz to 4.6 Ghz, 6 physical cores, 12 logical cores)
   * DDR4 3200Mhz 16 GB * 2
   * Ubuntu 20.04
   
   Giving `local[*]` showed instability on performance so fixed the value to 8. There're not many physical cores so I reduced the number of partitions down to 5 as well.
   
   > plenty of rows in session
   
   ```
   ./bin/spark-submit --master "local[8]" --conf spark.sql.shuffle.partitions=5 --driver-memory 16g --class com.hortonworks.spark.benchmark.streaming.sessionwindow.plenty_of_rows_in_session.BenchmarkSessionWindowListenerWordCountSessionFunctionAppendMode ./iot-trucking-app-spark-structured-streaming-<version>.jar --query-status-file /tmp/a.json --rate-row-per-second 200000 --rate-ramp-up-time-second 10
   ```
   
   [plenty-of-rows-in-session-append-mode-mine-rate-200000-v1.txt](https://github.com/apache/spark/files/6174672/plenty-of-rows-in-session-append-mode-mine-rate-200000-v1.txt)
   
   [plenty-of-rows-in-session-append-mode-PR-31570-rate-200000-v1.txt](https://github.com/apache/spark/files/6174674/plenty-of-rows-in-session-append-mode-PR-31570-rate-200000-v1.txt)
   
   * mine showed 160,000+ on processedRowsPerSecond.
   * PR-31570 didn't reach 60,000 on processedRowsPerSecond.
   
   mine showed 150% (2.5x) ~ 200% (3x) faster.
   
   > plenty of keys
   
   ```
   ./bin/spark-submit --master "local[8]" --conf spark.sql.shuffle.partitions=5 --driver-memory 16g --class com.hortonworks.spark.benchmark.streaming.sessionwindow.plenty_of_keys.BenchmarkSessionWindowListenerWordCountSessionFunctionAppendMode ./iot-trucking-app-spark-structured-streaming-<version>.jar --query-status-file /tmp/b.json --rate-row-per-second 15000000 --rate-ramp-up-time-second 10
   ```
   
   [plenty-of-keys-append-mode-mine-rate-15000000-v1.txt](https://github.com/apache/spark/files/6176511/plenty-of-keys-append-mode-mine-rate-15000000-v1.txt)
   
   [plenty-of-keys-append-mode-PR-31570-rate-15000000-v1.txt](https://github.com/apache/spark/files/6176510/plenty-of-keys-append-mode-PR-31570-rate-15000000-v1.txt)
   
   * mine showed "over" 13,000,000 on processedRowsPerSecond. (max peak exceeds 15,000,000)
   * PR-31570 didn't reach 9,000,000 on processedRowsPerSecond.
   
   mine showed around 50%+ (1.5x) faster.
   
   It'd be appreciated if anyone in reviewing can take the chance on performance test on their site and update the result. I'd love to see the result objecting my perf test (either my tests with different env/config or new tests), but if no one proves the result objecting mine, I guess we all know we need to make effort on the right direction.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-781971585


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/135256/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780099026


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39761/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-781870068


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39836/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #31570: [NOT-MERGE][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r576596507



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
##########
@@ -262,6 +262,17 @@ object AggUtils {
    *  - PartialMerge (now there is at most 1 tuple per group)
    *  - StateStoreSave (saves the tuple for the next batch)
    *  - Complete (output the current result of the aggregation)
+   *
+   * Plans a streaming aggregation with Session Window using the following progression:
+   *  - (Shuffle + Session Window Assignment, see `SessionWindowExec`)
+   *  - Partial Aggregation (now there is at most 1 tuple per group)
+   *  - SessionStateStoreRestore (now there is 1 tuple from this batch + optionally one from

Review comment:
       This process was simplified, compared with original PR. Removed an unnecessary `PartialMerge` so the session window streaming aggregation has the same physical structure as general streaming aggregation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780750653


   **[Test build #135201 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135201/testReport)** for PR 31570 at commit [`a95c5be`](https://github.com/apache/spark/commit/a95c5be7b744ce180d255afc6fb8ff1f6c0c7569).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-781971585


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/135256/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-803946728


   I can provide a new PR based on mine + proposed state format here. (For sure, I'm willing to add both of you to co-authors.)
   
   Now I'm dealing with update mode (required) and probably supporting global aggregation on streaming query (optional) which might take a couple of days, but I can raise a PR on WIP and continue working on it as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779691397


   cc @xuanyuanking 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-800944282






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-801230726


   @xuanyuanking @HeartSaVioR State store format config was added. State store format was revised with two stores instead of a long array of all states together. Appreciate if you have time to look at this.
   
   cc @gaborgsomogyi @zsxwing @tdas @Ngone51 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-804504665


   Just submitted mine - #31937
   Update mode is addressed. Global aggregation for streaming query is not supported yet, though it's still supported on batch query, so probably wouldn't matter.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-801230726


   @xuanyuanking @HeartSaVioR State store format was added. State store format was revised with two stores instead of a long array of all states together. Appreciate if you have time to look at this.
   
   cc @gaborgsomogyi @zsxwing @tdas @Ngone51 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r596491649



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreHandler.scala
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
+import org.apache.spark.sql.types.StructType
+
+trait StateStoreType
+
+/** Helper trait for invoking common functionalities of a state store. */
+abstract class StateStoreHandler extends Logging {

Review comment:
       It would be great if we can describe a little more how we reuse the original streaming join `StateStoreHandler`. Seems it's an important implementation detail.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
##########
@@ -262,6 +262,17 @@ object AggUtils {
    *  - PartialMerge (now there is at most 1 tuple per group)
    *  - StateStoreSave (saves the tuple for the next batch)
    *  - Complete (output the current result of the aggregation)
+   *
+   * Plans a streaming aggregation with Session Window using the following progression:
+   *  - (Shuffle + Session Window Assignment, see `SessionWindowExec`)
+   *  - Partial Aggregation (now there is at most 1 tuple per group)
+   *  - SessionStateStoreRestore (now there is 1 tuple from this batch + optionally one from

Review comment:
       Yes. If we see a bottleneck in the future. We can add the pre-shuffle PartialMerge back.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SessionWindowMergeExec.scala
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+
+/**
+ * The physical plan for streaming query, merge session window after restore from state store.
+ * Note: the end time of window that restore from statestore has already contain session windowGap
+ *
+ * @param windowExpressions
+ * @param sessionSpec
+ * @param child
+ */
+case class SessionWindowMergeExec(
+    windowExpressions: NamedExpression,
+    sessionSpec: Seq[Expression],
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    ClusteredDistribution(sessionSpec) :: Nil
+  }
+
+  // Data should be sorted, so we can merge session window directly.
+  // TODO: use this requirement for simplicity, not necessary to sort the whole dataset,
+  // try better way later.

Review comment:
       Seems we can already get rid of this ordering. Due to the session window in state store is in order.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SessionWindowExec.scala
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+
+/**
+ * Used for calculating the session window start and end for each row, so this plan requires
+ * child distributed by sessionSpec and sorted by time column in each part. The value for
+ * window start is time value of the first row in this window, the value for window end is
+ * time value of the last row plus the windowGap.
+ *
+ * @param windowExpressions session window expression for the exec node.
+ * @param sessionSpec the partition key of this session window, it is the rest column of
+ *                    groupingExpr in parent aggregate node.
+ * @param windowGap window gap in micro second.
+ * @param child child plan for this node.
+ */
+case class SessionWindowExec(
+    windowExpressions: NamedExpression,
+    timeColumn: Expression,
+    sessionSpec: Seq[Expression],
+    windowGap: Long,
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    ClusteredDistribution(sessionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+    Seq(sessionSpec.map(SortOrder(_, Ascending)) :+ SortOrder(timeColumn, Ascending))
+
+  override def producedAttributes: AttributeSet = AttributeSet(windowExpressions.toAttribute)
+
+  override def output: Seq[Attribute] = child.output ++ Seq(windowExpressions.toAttribute)
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**
+   * Produces the result of the query as an `RDD[InternalRow]`
+   *
+   * Overridden by concrete implementations of SparkPlan.
+   */
+  override protected def doExecute(): RDD[InternalRow] = {

Review comment:
       This is another difference with the @HeartSaVioR's. We use the pattern of SQL window function, while Jungtaek's one is a sub-class for aggregation iterator.
   Pros:
   Less code change. Don't mess up with aggregation logic.
   Cons:
   We keep the original row for a while. Might involve more memory cost.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780437998


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39776/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779996209


   This approach is intuitive, so the change footprint is relatively smaller. It is also easier to maintain. Per performance wise, it in on par with the alternative one.
   
   I optimized it more from original implementation. There are some I think we can work. E.g.:
   
   * isolating state store format to an manager class, so we may change state store format if we need
   * optimize state store format
   * dynamic session window gap
   * ...etc.
    
   I would not do that here now before we reach a consensus if we are all agreed to pursue this approach.
   
   BTW, I may also take time to play with the alternative approach.
   
   cc @dbtsai 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r596641133



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SessionWindowExec.scala
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+
+/**
+ * Used for calculating the session window start and end for each row, so this plan requires
+ * child distributed by sessionSpec and sorted by time column in each part. The value for
+ * window start is time value of the first row in this window, the value for window end is
+ * time value of the last row plus the windowGap.
+ *
+ * @param windowExpressions session window expression for the exec node.
+ * @param sessionSpec the partition key of this session window, it is the rest column of
+ *                    groupingExpr in parent aggregate node.
+ * @param windowGap window gap in micro second.
+ * @param child child plan for this node.
+ */
+case class SessionWindowExec(
+    windowExpressions: NamedExpression,
+    timeColumn: Expression,
+    sessionSpec: Seq[Expression],
+    windowGap: Long,
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    ClusteredDistribution(sessionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+    Seq(sessionSpec.map(SortOrder(_, Ascending)) :+ SortOrder(timeColumn, Ascending))
+
+  override def producedAttributes: AttributeSet = AttributeSet(windowExpressions.toAttribute)
+
+  override def output: Seq[Attribute] = child.output ++ Seq(windowExpressions.toAttribute)
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**
+   * Produces the result of the query as an `RDD[InternalRow]`
+   *
+   * Overridden by concrete implementations of SparkPlan.
+   */
+  override protected def doExecute(): RDD[InternalRow] = {

Review comment:
       I think it's valid to compare with UpdatingSessionIterator, which is used to support aggregation with one distinct and technically does the same with this. The major difference between twos looks to be that UpdatingSessionIterator doesn't try to memorize entire parts of row - it only memorizes the value part of row, as key part should be just all same with current session, and when session is closed, it restores the memorized rows with key & value parts.
   
   I guess there're pros and cons against twos but not that outstanding, and at least this is simpler, I'm OK to pick this up for the replacement of UpdatingSessionIterator. I'm feeling that MergingSessionsIterator is something we should revisit, but as I said, if we feel that's a blocker on moving forward, I'll take it up after this lands to the codebase.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780013042


   **[Test build #135180 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135180/testReport)** for PR 31570 at commit [`b774140`](https://github.com/apache/spark/commit/b7741406a36182967bc4ab23374a7ba95c12d417).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-801566364


   Thanks for taking up the missing piece in SS! I'm happy to see we continue the investment for this feature. Let me take a look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r597885781



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SessionWindowExec.scala
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+
+/**
+ * Used for calculating the session window start and end for each row, so this plan requires
+ * child distributed by sessionSpec and sorted by time column in each part. The value for
+ * window start is time value of the first row in this window, the value for window end is
+ * time value of the last row plus the windowGap.
+ *
+ * @param windowExpressions session window expression for the exec node.
+ * @param sessionSpec the partition key of this session window, it is the rest column of
+ *                    groupingExpr in parent aggregate node.
+ * @param windowGap window gap in micro second.
+ * @param child child plan for this node.
+ */
+case class SessionWindowExec(
+    windowExpressions: NamedExpression,
+    timeColumn: Expression,
+    sessionSpec: Seq[Expression],
+    windowGap: Long,
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    ClusteredDistribution(sessionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+    Seq(sessionSpec.map(SortOrder(_, Ascending)) :+ SortOrder(timeColumn, Ascending))
+
+  override def producedAttributes: AttributeSet = AttributeSet(windowExpressions.toAttribute)
+
+  override def output: Seq[Attribute] = child.output ++ Seq(windowExpressions.toAttribute)
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**
+   * Produces the result of the query as an `RDD[InternalRow]`
+   *
+   * Overridden by concrete implementations of SparkPlan.
+   */
+  override protected def doExecute(): RDD[InternalRow] = {

Review comment:
       That's right. It is a trade-off. For session window use case, I image that most cases the buffer should not hold too many rows. It's hardly to think about that a session window has rows causing serious problem. Sounds like a rare case to me.
   
   I think I understand the design thinking here. The approach `MergingSessionsIterator` takes increases complexity by mixing aggregating and session window operations. If don't mix with aggregation, currently I don't have idea how to avoid buffering. I may not worry much too early about this before we have real customer complaints on this issue.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-784534508


   **[Test build #135388 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135388/testReport)** for PR 31570 at commit [`11dc533`](https://github.com/apache/spark/commit/11dc53376bcedb9090a707c313a60e063f03817f).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780110770


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/39761/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r596534327



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SessionWindowExec.scala
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+
+/**
+ * Used for calculating the session window start and end for each row, so this plan requires
+ * child distributed by sessionSpec and sorted by time column in each part. The value for
+ * window start is time value of the first row in this window, the value for window end is
+ * time value of the last row plus the windowGap.
+ *
+ * @param windowExpressions session window expression for the exec node.
+ * @param sessionSpec the partition key of this session window, it is the rest column of
+ *                    groupingExpr in parent aggregate node.
+ * @param windowGap window gap in micro second.
+ * @param child child plan for this node.
+ */
+case class SessionWindowExec(
+    windowExpressions: NamedExpression,
+    timeColumn: Expression,
+    sessionSpec: Seq[Expression],
+    windowGap: Long,
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    ClusteredDistribution(sessionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+    Seq(sessionSpec.map(SortOrder(_, Ascending)) :+ SortOrder(timeColumn, Ascending))
+
+  override def producedAttributes: AttributeSet = AttributeSet(windowExpressions.toAttribute)
+
+  override def output: Seq[Attribute] = child.output ++ Seq(windowExpressions.toAttribute)
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**
+   * Produces the result of the query as an `RDD[InternalRow]`
+   *
+   * Overridden by concrete implementations of SparkPlan.
+   */
+  override protected def doExecute(): RDD[InternalRow] = {

Review comment:
       That may not just bring additional memory cost. That may bring spill, which is something we'd like to avoid at all cost. There's a trade-off, complexity vs optimization.
   
   I'm OK to move forward to make it work, and evaluate the value of the trade-off. Except state format we could change everything afterwards, so OK with that.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSessionWindowingSuite.scala
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.sql.execution.SessionWindowExec
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.StringType
+
+class DataFrameSessionWindowingSuite
+  extends QueryTest with SharedSparkSession with AdaptiveSparkPlanHelper with BeforeAndAfterEach {
+
+  import testImplicits._
+
+  private def withTempTable(f: String => Unit): Unit = {
+    val tableName = "temp"
+    Seq(
+      ("2018-08-22 19:39:27", "a", 4),
+      ("2018-08-22 19:39:34", "a", 1),
+      ("2018-08-22 19:39:56", "a", 3),
+      ("2018-08-22 19:39:56", "b", 2)
+    ).toDF("time", "key", "value").createOrReplaceTempView(tableName)
+    try {
+      f(tableName)
+    } finally {
+      spark.catalog.dropTempView(tableName)
+    }
+  }
+
+  test("session window in SQL with single key as session window key") {
+    withTempTable { table =>
+      val a = spark.sql(

Review comment:
       dead code?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
##########
@@ -262,6 +262,17 @@ object AggUtils {
    *  - PartialMerge (now there is at most 1 tuple per group)
    *  - StateStoreSave (saves the tuple for the next batch)
    *  - Complete (output the current result of the aggregation)
+   *
+   * Plans a streaming aggregation with Session Window using the following progression:
+   *  - (Shuffle + Session Window Assignment, see `SessionWindowExec`)
+   *  - Partial Aggregation (now there is at most 1 tuple per group)
+   *  - SessionStateStoreRestore (now there is 1 tuple from this batch + optionally one from

Review comment:
       As @xuanyuanking stated, we can say that's a simplification, but it is arguable that PartialMerge is "unnecessary". The sort operation is necessary to aggregate before shuffling so that is a trade-off, but it's known that less amount of shuffle data brings performance benefits.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780887435


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/135201/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-781962916


   **[Test build #135256 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135256/testReport)** for PR 31570 at commit [`fc3d122`](https://github.com/apache/spark/commit/fc3d1224bf2f66dd8c30bc58db1ade37bfdcad1e).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-873224341


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/45109/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780813231


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/39782/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-800921997


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40734/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780110770


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/39761/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-800916851


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/136156/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779991885


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r596514052



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3964,6 +3965,80 @@ object TimeWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replace the [[SessionWindowExpression]] in Aggregate node, this rule will add [[SessionWindow]]
+ * as the current Aggregate's new child. It will throw [[AnalysisException]] while
+ * [[SessionWindowExpression]] is the only column in group by.
+ */
+object ResolveSessionWindow extends Rule[LogicalPlan] {
+
+  private def hasWindowFunction(groupList: Seq[Expression]): Boolean =
+    groupList.exists(hasWindowFunction)
+
+  private def hasWindowFunction(expr: Expression): Boolean = {
+    expr.find {
+      case window: SessionWindowExpression => true
+      case _ => false
+    }.isDefined
+  }
+
+  private final val WINDOW_COL_NAME = "session_window"
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    case p @ Aggregate(groupingExpr, aggregateExpr, _) if hasWindowFunction(groupingExpr) =>

Review comment:
       `hasWindowFunction` and `windowExpressions` should count time window as well. I guess it'll be simpler to resolve when we consolidate, but it would depend on how more complicated it will become.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3964,6 +3965,80 @@ object TimeWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replace the [[SessionWindowExpression]] in Aggregate node, this rule will add [[SessionWindow]]
+ * as the current Aggregate's new child. It will throw [[AnalysisException]] while
+ * [[SessionWindowExpression]] is the only column in group by.
+ */
+object ResolveSessionWindow extends Rule[LogicalPlan] {

Review comment:
       I know the logic in TimeWindowing is a bit long and you may not want to add additional complexity there, but I feel this still has to be consolidated with TimeWindowing (that said TimeWindowing needs to refactor a bit after adding logic on session window). Because there're lots of similarities and the limitation is applied altogether, like only allowing a time window should include session window, and vice versa.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala
##########
@@ -94,7 +94,47 @@ case class TimeWindow(
   }
 }
 
-object TimeWindow {
+case class SessionWindowExpression(

Review comment:
       I feel it's OK to just say `SessionWindow` like we do for `TimeWindow`, but if we don't feel it's clear, we need to rename `TimeWindow` to `TimeWindowExpression` as well for consistency.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3964,6 +3965,80 @@ object TimeWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replace the [[SessionWindowExpression]] in Aggregate node, this rule will add [[SessionWindow]]
+ * as the current Aggregate's new child. It will throw [[AnalysisException]] while
+ * [[SessionWindowExpression]] is the only column in group by.
+ */
+object ResolveSessionWindow extends Rule[LogicalPlan] {
+
+  private def hasWindowFunction(groupList: Seq[Expression]): Boolean =
+    groupList.exists(hasWindowFunction)
+
+  private def hasWindowFunction(expr: Expression): Boolean = {
+    expr.find {
+      case window: SessionWindowExpression => true
+      case _ => false
+    }.isDefined
+  }
+
+  private final val WINDOW_COL_NAME = "session_window"

Review comment:
       I see how to deal with meta fields (start, end) is slightly different from time window. Would the difference be bugging when we consolidate twos into one? I have no specific preference, but would like to be consistent with both so that we don't deal with differences in maintenance.
   
   That said, let's change both (OK to do it on follow-up PR), or follow the current approach.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3964,6 +3965,80 @@ object TimeWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replace the [[SessionWindowExpression]] in Aggregate node, this rule will add [[SessionWindow]]
+ * as the current Aggregate's new child. It will throw [[AnalysisException]] while
+ * [[SessionWindowExpression]] is the only column in group by.
+ */
+object ResolveSessionWindow extends Rule[LogicalPlan] {
+
+  private def hasWindowFunction(groupList: Seq[Expression]): Boolean =
+    groupList.exists(hasWindowFunction)
+
+  private def hasWindowFunction(expr: Expression): Boolean = {
+    expr.find {
+      case window: SessionWindowExpression => true
+      case _ => false
+    }.isDefined
+  }
+
+  private final val WINDOW_COL_NAME = "session_window"
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    case p @ Aggregate(groupingExpr, aggregateExpr, _) if hasWindowFunction(groupingExpr) =>
+      val child = p.child
+      val windowExpressions =
+        p.expressions.flatMap(_.collect { case t: SessionWindowExpression => t }).toSet
+
+      val numWindowExpr = windowExpressions.size
+      // Only support a single session window expression for now
+      if (numWindowExpr == 1 &&
+        windowExpressions.head.timeColumn.resolved &&
+        windowExpressions.head.checkInputDataTypes().isSuccess) {
+
+        val window = windowExpressions.head
+
+        val metadata = window.timeColumn match {
+          case a: Attribute => a.metadata
+          case _ => Metadata.empty
+        }
+
+        val windowAttr = AttributeReference(
+          WINDOW_COL_NAME, window.dataType, metadata = metadata)()
+
+        // check partitionExpression in groupingExpr
+        val partitionExpression = groupingExpr.filterNot(hasWindowFunction)
+        if (partitionExpression.isEmpty) {
+          p.failAnalysis("Cannot use session_window as the only group by column.")

Review comment:
       I feel this is less clearer, probably better to mention another key column(s) are required? Like "Cannot use session_window without additional key column(s)", or if we assume end users know about the concept of "global aggregation", "Cannot apply session_window on global aggregation".

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##########
@@ -673,6 +673,19 @@ case class Window(
   def windowOutputSet: AttributeSet = AttributeSet(windowExpressions.map(_.toAttribute))
 }
 
+case class SessionWindow(

Review comment:
       Ah OK this is conflicting with SessionWindowExpression when renaming. OK to leave as it is.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SessionWindowMergeExec.scala
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+
+/**
+ * The physical plan for streaming query, merge session window after restore from state store.
+ * Note: the end time of window that restore from statestore has already contain session windowGap
+ *
+ * @param windowExpressions

Review comment:
       add explanation on each param, or simply remove

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SessionWindowExec.scala
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+
+/**
+ * Used for calculating the session window start and end for each row, so this plan requires
+ * child distributed by sessionSpec and sorted by time column in each part. The value for
+ * window start is time value of the first row in this window, the value for window end is
+ * time value of the last row plus the windowGap.
+ *
+ * @param windowExpressions session window expression for the exec node.
+ * @param sessionSpec the partition key of this session window, it is the rest column of
+ *                    groupingExpr in parent aggregate node.
+ * @param windowGap window gap in micro second.
+ * @param child child plan for this node.
+ */
+case class SessionWindowExec(
+    windowExpressions: NamedExpression,
+    timeColumn: Expression,
+    sessionSpec: Seq[Expression],
+    windowGap: Long,
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    ClusteredDistribution(sessionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+    Seq(sessionSpec.map(SortOrder(_, Ascending)) :+ SortOrder(timeColumn, Ascending))
+
+  override def producedAttributes: AttributeSet = AttributeSet(windowExpressions.toAttribute)
+
+  override def output: Seq[Attribute] = child.output ++ Seq(windowExpressions.toAttribute)
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering

Review comment:
       This made me stop to think, and looks like this is guaranteed per micro-batch. Probably ideal to leave a brief explanation how it is guaranteed.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
##########
@@ -348,7 +348,6 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           rewrittenResultExpressions,
           stateVersion,
           planLater(child))
-

Review comment:
       nit: unnecessary change

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3964,6 +3965,80 @@ object TimeWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replace the [[SessionWindowExpression]] in Aggregate node, this rule will add [[SessionWindow]]
+ * as the current Aggregate's new child. It will throw [[AnalysisException]] while
+ * [[SessionWindowExpression]] is the only column in group by.
+ */
+object ResolveSessionWindow extends Rule[LogicalPlan] {

Review comment:
       Or, at least counting windows should consider both.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SessionWindowExec.scala
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+
+/**
+ * Used for calculating the session window start and end for each row, so this plan requires
+ * child distributed by sessionSpec and sorted by time column in each part. The value for
+ * window start is time value of the first row in this window, the value for window end is
+ * time value of the last row plus the windowGap.
+ *
+ * @param windowExpressions session window expression for the exec node.
+ * @param sessionSpec the partition key of this session window, it is the rest column of
+ *                    groupingExpr in parent aggregate node.
+ * @param windowGap window gap in micro second.
+ * @param child child plan for this node.
+ */
+case class SessionWindowExec(
+    windowExpressions: NamedExpression,
+    timeColumn: Expression,
+    sessionSpec: Seq[Expression],
+    windowGap: Long,
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    ClusteredDistribution(sessionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+    Seq(sessionSpec.map(SortOrder(_, Ascending)) :+ SortOrder(timeColumn, Ascending))
+
+  override def producedAttributes: AttributeSet = AttributeSet(windowExpressions.toAttribute)
+
+  override def output: Seq[Attribute] = child.output ++ Seq(windowExpressions.toAttribute)
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**

Review comment:
       The comment doesn't seem to be needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-800892816


   **[Test build #136153 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/136153/testReport)** for PR 31570 at commit [`cf65c2a`](https://github.com/apache/spark/commit/cf65c2a06e5634c313ce6db4483dcbdc4f5e3030).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-803946728


   I can provide a new PR based on mine + proposed state format here. (For sure, I'm willing to add both of you to co-authors.)
   
   Now I'm dealing with update mode (must do) and probably supporting global aggregation on streaming query (optional) which might take a couple of days, but I can raise a PR on WIP and continue working on it as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-800944283


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/40734/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-784585941


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/39968/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780767749


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39782/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-800988676


   **[Test build #136153 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/136153/testReport)** for PR 31570 at commit [`cf65c2a`](https://github.com/apache/spark/commit/cf65c2a06e5634c313ce6db4483dcbdc4f5e3030).
    * This patch passes all tests.
    * This patch **does not merge cleanly**.
    * This patch adds the following public classes _(experimental)_:
     * `  case class WindowRecord(start: Long, end: Long, isNew: Boolean, row: UnsafeRow)`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-873224341


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/45109/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-800940205


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40734/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780474823


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/39776/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780013042


   **[Test build #135180 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135180/testReport)** for PR 31570 at commit [`b774140`](https://github.com/apache/spark/commit/b7741406a36182967bc4ab23374a7ba95c12d417).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking edited a comment on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
xuanyuanking edited a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-804069039


   @viirya @HeartSaVioR Agree with both of you. I'm also running the local benchmark. The only concern is on the `MergingSessionsIterator ` detail, it would be great if we can separate the code in the first stage and maybe refractory later.
   
   ```
   I'm dealing with update mode (required)
   ```
   If it takes too much time, I think the append mode and complete mode (maybe also optional) are good enough.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r596641133



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SessionWindowExec.scala
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+
+/**
+ * Used for calculating the session window start and end for each row, so this plan requires
+ * child distributed by sessionSpec and sorted by time column in each part. The value for
+ * window start is time value of the first row in this window, the value for window end is
+ * time value of the last row plus the windowGap.
+ *
+ * @param windowExpressions session window expression for the exec node.
+ * @param sessionSpec the partition key of this session window, it is the rest column of
+ *                    groupingExpr in parent aggregate node.
+ * @param windowGap window gap in micro second.
+ * @param child child plan for this node.
+ */
+case class SessionWindowExec(
+    windowExpressions: NamedExpression,
+    timeColumn: Expression,
+    sessionSpec: Seq[Expression],
+    windowGap: Long,
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    ClusteredDistribution(sessionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+    Seq(sessionSpec.map(SortOrder(_, Ascending)) :+ SortOrder(timeColumn, Ascending))
+
+  override def producedAttributes: AttributeSet = AttributeSet(windowExpressions.toAttribute)
+
+  override def output: Seq[Attribute] = child.output ++ Seq(windowExpressions.toAttribute)
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**
+   * Produces the result of the query as an `RDD[InternalRow]`
+   *
+   * Overridden by concrete implementations of SparkPlan.
+   */
+  override protected def doExecute(): RDD[InternalRow] = {

Review comment:
       I think it's valid to compare this with UpdatingSessionIterator, which is used to support aggregation with one distinct and technically does the same with this. The major difference between twos looks to be that UpdatingSessionIterator doesn't try to memorize entire parts of row - it only memorizes the value part of row, as key part should be just all same with current session, and when session is closed, it restores the memorized rows with key & value parts.
   
   I guess there're pros and cons against twos but not that outstanding, and at least this is simpler, I'm OK to pick this up for the replacement of UpdatingSessionIterator. I'm feeling that MergingSessionsIterator is something we should revisit, but as I said, if we feel that's a blocker on moving forward, I'll take it up after this lands to the codebase.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780111186


   **[Test build #135181 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135181/testReport)** for PR 31570 at commit [`e1bb4af`](https://github.com/apache/spark/commit/e1bb4af4c08e6139d7b4d9578327ec7ac83b1623).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-803912957


   Thanks for re-evaluating two approaches. It is valuable.
   
   Basically by leveraging the new state store format, two previous efforts are now pretty close, except for how they handle session merging. I can easily replace some exec nodes here from the other PR.
   
   No worry. The precondition to picking the simpler approach, is that two approaches have similar performance. I remember this was claimed in the JIRA. Re-evaluation gives us a different number.
   
   I ran the benchmark locally. Due to the difference of machines, I cannot get the same numbers but I can see there is significant difference between two approaches, i.e., 1) merging then aggregating, 2) merging with aggregating.
    
   I think we have a few options.
   
   1. Replace with merging with aggregating (`MergingSessionsIterator`). I'm doing it locally to see if we can get a similar number. It'd be good too @HeartSaVioR would like to create PR against this. So it is easier to incorporate authored commits from all parties. It is also fine if @HeartSaVioR wants to work on it after merging this.
   2. Switch to the other previous effort + new state store format.
   
   Either works for me. Actually two options are basically the same logic to me, except for some cosmetic difference.
   
   @xuanyuanking WDYT?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-801547775


   I'm reviewing this. Thank you @viirya !!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-781876774


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/39836/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780225612


   **[Test build #135181 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135181/testReport)** for PR 31570 at commit [`e1bb4af`](https://github.com/apache/spark/commit/e1bb4af4c08e6139d7b4d9578327ec7ac83b1623).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780886536


   **[Test build #135201 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135201/testReport)** for PR 31570 at commit [`a95c5be`](https://github.com/apache/spark/commit/a95c5be7b744ce180d255afc6fb8ff1f6c0c7569).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-873221142


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45109/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779996209


   This approach is intuitive, so the change footprint is relatively smaller. It is also easier to maintain. Per performance wise, it in on par with the alternative one.
   
   I optimized it more from original implementation. There are some I think we can work. E.g. isolating state store format to an manager class, so we may change state store format if we need. Dynamic session window gap, etc. I would not do that here now before we reach a consensus if we are all agreed to pursue this approach.
   
   BTW, I may also take time to play with the alternative approach.
   
   cc @dbtsai 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r597938585



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SessionWindowExec.scala
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+
+/**
+ * Used for calculating the session window start and end for each row, so this plan requires
+ * child distributed by sessionSpec and sorted by time column in each part. The value for
+ * window start is time value of the first row in this window, the value for window end is
+ * time value of the last row plus the windowGap.
+ *
+ * @param windowExpressions session window expression for the exec node.
+ * @param sessionSpec the partition key of this session window, it is the rest column of
+ *                    groupingExpr in parent aggregate node.
+ * @param windowGap window gap in micro second.
+ * @param child child plan for this node.
+ */
+case class SessionWindowExec(
+    windowExpressions: NamedExpression,
+    timeColumn: Expression,
+    sessionSpec: Seq[Expression],
+    windowGap: Long,
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    ClusteredDistribution(sessionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+    Seq(sessionSpec.map(SortOrder(_, Ascending)) :+ SortOrder(timeColumn, Ascending))
+
+  override def producedAttributes: AttributeSet = AttributeSet(windowExpressions.toAttribute)
+
+  override def output: Seq[Attribute] = child.output ++ Seq(windowExpressions.toAttribute)
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**
+   * Produces the result of the query as an `RDD[InternalRow]`
+   *
+   * Overridden by concrete implementations of SparkPlan.
+   */
+  override protected def doExecute(): RDD[InternalRow] = {

Review comment:
       That is a one of concerns. Another concern is, to buffer row you'll need to "copy" the row, which makes entire input rows being copied. I see there're multiple physical ops to buffer rows, which makes me wondering about the performance and resource usage.
   
   I'll need to check the performance is really on par with mine - I think the major complexity of mine was introduced on linked-list of state format. Migrating state format to the one in agreement here would reduce the complexity significantly, so after applying the change on mine, I think we need to reevaluate both properly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-873224326


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/45109/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-784703441


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/135388/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780414291


   **[Test build #135195 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135195/testReport)** for PR 31570 at commit [`a95c5be`](https://github.com/apache/spark/commit/a95c5be7b744ce180d255afc6fb8ff1f6c0c7569).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31570: [NOT-MERGE][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779599239


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/39741/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779718635






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r596641133



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SessionWindowExec.scala
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+
+/**
+ * Used for calculating the session window start and end for each row, so this plan requires
+ * child distributed by sessionSpec and sorted by time column in each part. The value for
+ * window start is time value of the first row in this window, the value for window end is
+ * time value of the last row plus the windowGap.
+ *
+ * @param windowExpressions session window expression for the exec node.
+ * @param sessionSpec the partition key of this session window, it is the rest column of
+ *                    groupingExpr in parent aggregate node.
+ * @param windowGap window gap in micro second.
+ * @param child child plan for this node.
+ */
+case class SessionWindowExec(
+    windowExpressions: NamedExpression,
+    timeColumn: Expression,
+    sessionSpec: Seq[Expression],
+    windowGap: Long,
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    ClusteredDistribution(sessionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+    Seq(sessionSpec.map(SortOrder(_, Ascending)) :+ SortOrder(timeColumn, Ascending))
+
+  override def producedAttributes: AttributeSet = AttributeSet(windowExpressions.toAttribute)
+
+  override def output: Seq[Attribute] = child.output ++ Seq(windowExpressions.toAttribute)
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**
+   * Produces the result of the query as an `RDD[InternalRow]`
+   *
+   * Overridden by concrete implementations of SparkPlan.
+   */
+  override protected def doExecute(): RDD[InternalRow] = {

Review comment:
       I think it's valid to compare this with UpdatingSessionIterator, which is used to support aggregation with one distinct and technically does the same with this. The major difference between twos looks to be that UpdatingSessionIterator doesn't try to memorize entire parts of row - it only memorizes the value part of row, as key part should be just all same with current session, and when session is closed, it restores the memorized rows with key & value parts.
   
   I guess there're pros and cons against twos but not that outstanding (memory usage may be better for UpdatingSessionIterator, but we have to pay cost for restoring), and at least this is simpler, I'm OK to pick this up for the replacement of UpdatingSessionIterator. I'm feeling that MergingSessionsIterator is something we should revisit, but as I said, if we feel that's a blocker on moving forward, I'll take it up after this lands to the codebase.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31570: [NOT-MERGE][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779579374


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/135160/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780169983






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780240326


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/135181/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779717393






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-784585941


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/39968/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780150105


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39762/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780541394


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/135195/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780740068


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31570: [NOT-MERGE][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779579374


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/135160/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-800989522


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/136153/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779996209


   This approach is intuitive, so the change footprint is relatively smaller. It is also easier to maintain. Per performance wise, it in on par with the alternative one.
   
   I optimized it more from original implementation. There are some I think we can work. E.g. isolating state store format to an manager class, so we may change state store format if we need. Dynamic session window gap, etc. I would not do that now before we reach a consensus if we are all agreed to pursue this approach.
   
   BTW, I may also take time to play with the alternative approach.
   
   cc @dbtsai 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780750653


   **[Test build #135201 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135201/testReport)** for PR 31570 at commit [`a95c5be`](https://github.com/apache/spark/commit/a95c5be7b744ce180d255afc6fb8ff1f6c0c7569).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31570: [NOT-MERGE][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779627205






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r597938585



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SessionWindowExec.scala
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+
+/**
+ * Used for calculating the session window start and end for each row, so this plan requires
+ * child distributed by sessionSpec and sorted by time column in each part. The value for
+ * window start is time value of the first row in this window, the value for window end is
+ * time value of the last row plus the windowGap.
+ *
+ * @param windowExpressions session window expression for the exec node.
+ * @param sessionSpec the partition key of this session window, it is the rest column of
+ *                    groupingExpr in parent aggregate node.
+ * @param windowGap window gap in micro second.
+ * @param child child plan for this node.
+ */
+case class SessionWindowExec(
+    windowExpressions: NamedExpression,
+    timeColumn: Expression,
+    sessionSpec: Seq[Expression],
+    windowGap: Long,
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    ClusteredDistribution(sessionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+    Seq(sessionSpec.map(SortOrder(_, Ascending)) :+ SortOrder(timeColumn, Ascending))
+
+  override def producedAttributes: AttributeSet = AttributeSet(windowExpressions.toAttribute)
+
+  override def output: Seq[Attribute] = child.output ++ Seq(windowExpressions.toAttribute)
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**
+   * Produces the result of the query as an `RDD[InternalRow]`
+   *
+   * Overridden by concrete implementations of SparkPlan.
+   */
+  override protected def doExecute(): RDD[InternalRow] = {

Review comment:
       That is a one of concerns. Another concern is, to buffer row you'll need to "copy" the row, which makes entire input rows being copied. (Doesn't matter how many rows are buffered at specific time.) I see there're multiple physical ops to buffer rows, which makes me wondering about the performance and resource usage.
   
   I'll need to check the performance is really on par with mine - I think the major complexity of mine was introduced on linked-list of state format. Migrating state format to the one in agreement here would reduce the complexity significantly, so after applying the change on mine, we could reevaluate both properly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780787501


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39782/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-781851450


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39836/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779718635






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r597938585



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SessionWindowExec.scala
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+
+/**
+ * Used for calculating the session window start and end for each row, so this plan requires
+ * child distributed by sessionSpec and sorted by time column in each part. The value for
+ * window start is time value of the first row in this window, the value for window end is
+ * time value of the last row plus the windowGap.
+ *
+ * @param windowExpressions session window expression for the exec node.
+ * @param sessionSpec the partition key of this session window, it is the rest column of
+ *                    groupingExpr in parent aggregate node.
+ * @param windowGap window gap in micro second.
+ * @param child child plan for this node.
+ */
+case class SessionWindowExec(
+    windowExpressions: NamedExpression,
+    timeColumn: Expression,
+    sessionSpec: Seq[Expression],
+    windowGap: Long,
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    ClusteredDistribution(sessionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+    Seq(sessionSpec.map(SortOrder(_, Ascending)) :+ SortOrder(timeColumn, Ascending))
+
+  override def producedAttributes: AttributeSet = AttributeSet(windowExpressions.toAttribute)
+
+  override def output: Seq[Attribute] = child.output ++ Seq(windowExpressions.toAttribute)
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**
+   * Produces the result of the query as an `RDD[InternalRow]`
+   *
+   * Overridden by concrete implementations of SparkPlan.
+   */
+  override protected def doExecute(): RDD[InternalRow] = {

Review comment:
       That is a one of concerns. Another concern is, to buffer row you'll need to "copy" the row, which makes entire input rows going through buffer being copied. (Doesn't matter how many rows are buffered at specific time.) I see there're multiple physical ops to buffer rows, which makes me wondering about the performance and resource usage.
   
   I'll need to check the performance is really on par with mine - I think the major complexity of mine was introduced on linked-list of state format. Migrating state format to the one in agreement here would reduce the complexity significantly, so after applying the change on mine, we could reevaluate both properly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-805714822


   Just verified the benchmark. Similar results on my side: The plenty of rows in the session test shows about 2X (70000 vs 37000). I can see many spilling logs for ExternalAppendOnlyMap. Maybe I can do some turning and performance analysis later, will review #31937 first. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-784574205


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39968/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780448934


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39776/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r597938585



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SessionWindowExec.scala
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+
+/**
+ * Used for calculating the session window start and end for each row, so this plan requires
+ * child distributed by sessionSpec and sorted by time column in each part. The value for
+ * window start is time value of the first row in this window, the value for window end is
+ * time value of the last row plus the windowGap.
+ *
+ * @param windowExpressions session window expression for the exec node.
+ * @param sessionSpec the partition key of this session window, it is the rest column of
+ *                    groupingExpr in parent aggregate node.
+ * @param windowGap window gap in micro second.
+ * @param child child plan for this node.
+ */
+case class SessionWindowExec(
+    windowExpressions: NamedExpression,
+    timeColumn: Expression,
+    sessionSpec: Seq[Expression],
+    windowGap: Long,
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    ClusteredDistribution(sessionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+    Seq(sessionSpec.map(SortOrder(_, Ascending)) :+ SortOrder(timeColumn, Ascending))
+
+  override def producedAttributes: AttributeSet = AttributeSet(windowExpressions.toAttribute)
+
+  override def output: Seq[Attribute] = child.output ++ Seq(windowExpressions.toAttribute)
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**
+   * Produces the result of the query as an `RDD[InternalRow]`
+   *
+   * Overridden by concrete implementations of SparkPlan.
+   */
+  override protected def doExecute(): RDD[InternalRow] = {

Review comment:
       That is a one of concerns. Another concern is, to buffer row you'll need to "copy" the row, which makes entire input rows being copied. I see there're multiple physical ops to buffer rows, which makes me wondering about the performance and resource usage.
   
   I'll need to check the performance is really on par with mine - I think the major complexity of mine was introduced on linked-list of state format. Migrating state format to the one in agreement here would reduce the complexity significantly, so after applying the change on mine, we could reevaluate both properly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780414291


   **[Test build #135195 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135195/testReport)** for PR 31570 at commit [`a95c5be`](https://github.com/apache/spark/commit/a95c5be7b744ce180d255afc6fb8ff1f6c0c7569).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780169982






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #31570: [NOT-MERGE][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r576596507



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
##########
@@ -262,6 +262,17 @@ object AggUtils {
    *  - PartialMerge (now there is at most 1 tuple per group)
    *  - StateStoreSave (saves the tuple for the next batch)
    *  - Complete (output the current result of the aggregation)
+   *
+   * Plans a streaming aggregation with Session Window using the following progression:
+   *  - (Shuffle + Session Window Assignment, see `SessionWindowExec`)
+   *  - Partial Aggregation (now there is at most 1 tuple per group)
+   *  - SessionStateStoreRestore (now there is 1 tuple from this batch + optionally one from

Review comment:
       This process was simplified, compared with original PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779661448


   **[Test build #135167 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135167/testReport)** for PR 31570 at commit [`b774140`](https://github.com/apache/spark/commit/b7741406a36182967bc4ab23374a7ba95c12d417).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-781829637


   **[Test build #135256 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135256/testReport)** for PR 31570 at commit [`fc3d122`](https://github.com/apache/spark/commit/fc3d1224bf2f66dd8c30bc58db1ade37bfdcad1e).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780540665


   **[Test build #135195 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135195/testReport)** for PR 31570 at commit [`a95c5be`](https://github.com/apache/spark/commit/a95c5be7b744ce180d255afc6fb8ff1f6c0c7569).
    * This patch **fails Spark unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-800892816


   **[Test build #136153 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/136153/testReport)** for PR 31570 at commit [`cf65c2a`](https://github.com/apache/spark/commit/cf65c2a06e5634c313ce6db4483dcbdc4f5e3030).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r596658962



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##########
@@ -438,6 +443,434 @@ case class StateStoreSaveExec(
   }
 }
 
+/**
+ * For each input tuple, the key is calculated and the value from the [[StateStore]] is added
+ * to the stream (in addition to the input tuple) if present.
+ *
+ * The keyExpressions should exclude the sessionWindow expression.
+ */
+case class SessionWindowStateStoreRestoreExec(
+    keyExpressions: Seq[Attribute],
+    timeExpression: Attribute,
+    stateInfo: Option[StatefulOperatorStateInfo],
+    child: SparkPlan)
+  extends UnaryExecNode with StateStoreReader {
+
+  import StreamingSessionWindowHelper._
+
+  private val storeConf = new StateStoreConf(sqlContext.conf)
+  private val hadoopConfBcast = sparkContext.broadcast(
+    new SerializableConfiguration(SessionState.newHadoopConf(
+      sparkContext.hadoopConfiguration, sqlContext.conf)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+    assert(keyExpressions.nonEmpty, "Grouping key must be specified when using sessionWindow")
+
+    val stateVersion = conf.getConf(SQLConf.STREAMING_SESSION_WINDOW_STATE_FORMAT_VERSION)
+    val stateStoreCoord = sqlContext.sessionState.streamingQueryManager.stateStoreCoordinator
+
+    child.execute().mapPartitionsWithStateStoreAwareRDD(
+      getStateInfo,
+      StreamingSessionWindowStateManager.allStateStoreNames(stateVersion),
+      stateStoreCoord) { case (partitionId, iter) =>
+
+      val stateStoreManager = StreamingSessionWindowStateManager.createStateManager(
+        keyExpressions, timeExpression, child.output, child.output, stateInfo, storeConf,
+        hadoopConfBcast.value.value, partitionId, stateVersion)
+
+      var preKey: UnsafeRow = null
+      iter.flatMap { row =>
+        val key = stateStoreManager.getKey(row)
+        val startTime = stateStoreManager.getStartTime(row)
+        var savedState: Seq[UnsafeRow] = null
+
+        // For one key, we only get once from state store.
+        // e.g. the iterator may contains elements below
+        // |  key  |  window | value |
+        // |   a   |   w1    | xx    |
+        // |   a   |   w2    | xx    |
+        // |   b   |   w3    | xx    |
+        // |   c   |   w4    | xx    |
+        // for the key a of different window, we only got once
+        // from statestore, otherwise will get error result
+        if (preKey == null || key != preKey) {
+          savedState = stateStoreManager.getStates(key)
+
+          // must copy the key. The key is a UnsafeRow and pointer to some memory
+          // when next `getKey` invoke the value of the memory will change, so the value of
+          // preKey will change automatically
+          //
+          // e.g. If the key = a, assign preKey by key without copy, when next step of flapMap
+          // after the `getKey` the key = b, the preKey also is b, however, the expected value
+          // of preKey is a
+          preKey = key.copy
+        }
+
+        if (savedState == null) {
+          numOutputRows += 1
+          Seq(row)
+        } else {
+          val outputs = savedState :+ row

Review comment:
       As I said, I don't think this retains the overall order hence requiring additional sort. `savedState` can be injected anywhere in input rows.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780474823


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/39776/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-800944282


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/40737/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r576596507



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
##########
@@ -262,6 +262,17 @@ object AggUtils {
    *  - PartialMerge (now there is at most 1 tuple per group)
    *  - StateStoreSave (saves the tuple for the next batch)
    *  - Complete (output the current result of the aggregation)
+   *
+   * Plans a streaming aggregation with Session Window using the following progression:
+   *  - (Shuffle + Session Window Assignment, see `SessionWindowExec`)
+   *  - Partial Aggregation (now there is at most 1 tuple per group)
+   *  - SessionStateStoreRestore (now there is 1 tuple from this batch + optionally one from

Review comment:
       This process was simplified, compared with original PR. Removed unnecessary `PartialMerge`s so the session window streaming aggregation has the same physical structure as general streaming aggregation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-804282542


   @HeartSaVioR @xuanyuanking Okay. Sounds good to me. I also think append and complete mode might be enough and we can work on update mode later if it takes longer.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #31570: [NOT-MERGE][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r576611393



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
##########
@@ -262,6 +262,17 @@ object AggUtils {
    *  - PartialMerge (now there is at most 1 tuple per group)
    *  - StateStoreSave (saves the tuple for the next batch)
    *  - Complete (output the current result of the aggregation)
+   *
+   * Plans a streaming aggregation with Session Window using the following progression:
+   *  - (Shuffle + Session Window Assignment, see `SessionWindowExec`)
+   *  - Partial Aggregation (now there is at most 1 tuple per group)
+   *  - SessionStateStoreRestore (now there is 1 tuple from this batch + optionally one from

Review comment:
       As it removed unnecessary `PartialMerge`, I think this could improve the performance.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780887435


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/135201/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-800989522


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/136153/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-784703441


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/135388/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r596631016



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SessionWindowExec.scala
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+
+/**
+ * Used for calculating the session window start and end for each row, so this plan requires
+ * child distributed by sessionSpec and sorted by time column in each part. The value for
+ * window start is time value of the first row in this window, the value for window end is
+ * time value of the last row plus the windowGap.
+ *
+ * @param windowExpressions session window expression for the exec node.
+ * @param sessionSpec the partition key of this session window, it is the rest column of
+ *                    groupingExpr in parent aggregate node.
+ * @param windowGap window gap in micro second.
+ * @param child child plan for this node.
+ */
+case class SessionWindowExec(
+    windowExpressions: NamedExpression,
+    timeColumn: Expression,
+    sessionSpec: Seq[Expression],
+    windowGap: Long,
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    ClusteredDistribution(sessionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+    Seq(sessionSpec.map(SortOrder(_, Ascending)) :+ SortOrder(timeColumn, Ascending))
+
+  override def producedAttributes: AttributeSet = AttributeSet(windowExpressions.toAttribute)
+
+  override def output: Seq[Attribute] = child.output ++ Seq(windowExpressions.toAttribute)
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**
+   * Produces the result of the query as an `RDD[InternalRow]`
+   *
+   * Overridden by concrete implementations of SparkPlan.
+   */
+  override protected def doExecute(): RDD[InternalRow] = {

Review comment:
       And if I understand correctly, this needs to copy all input rows because of the buffering. Now I could reload the context of my patch a bit; MergingSessionsIterator only copies the rows which are the first row of session, as it only needs to retain the last session to compare with current input row. That was the reason I chose such complexity. Performance wise, and also there's concern on JIRA issue about memory usage on the flight.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780240326


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/135181/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-784560832


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39968/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780153193


   **[Test build #135180 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135180/testReport)** for PR 31570 at commit [`b774140`](https://github.com/apache/spark/commit/b7741406a36182967bc4ab23374a7ba95c12d417).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779661448


   **[Test build #135167 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135167/testReport)** for PR 31570 at commit [`b774140`](https://github.com/apache/spark/commit/b7741406a36182967bc4ab23374a7ba95c12d417).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-804069039


   @viirya @HeartSaVioR Agree with both of you. I'm also running the local benchmark. The only concern is on the `MergingSessionsIterator ` detail, it would be great if we can separate the code in the first stage and maybe refractory later.
   
   ```
   I'm dealing with update mode (required)
   ```
   If it takes too much time, I think the update mode and complete mode (maybe also optional) are good enough.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779686249


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39748/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31570: [NOT-MERGE][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779627205






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-803218591


   I did some performance tests I did before, and I observed outstanding difference between revised my PR (my PR + state format used here) vs this PR.
   
   > revised my PR (versioned as 3.2.0-SPARK-10816-heartsavior)
   
   https://github.com/HeartSaVioR/spark/tree/SPARK-10816-heartsavior-rebase-apply-PR-31570-versioned
   
   > this PR (versioned as 3.2.0-PR-31570)
   
   https://github.com/HeartSaVioR/spark/tree/PR-31570-versioned
   
   > benchmark code
   
   https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/tree/benchmarking-SPARK-10816
   
   I built the benchmark code against locally installed Spark artifacts for both (that said, I built the benchmark code per each).
   
   Simple, change built.sbt to update Spark version to the custom one, and run `sbt clean assembly`.
   
   > machine to run benchmark
   
   * AMD Ryzen 5600X (no overclock, 3.7 Ghz to 4.6 Ghz, 6 physical cores, 12 logical cores)
   * DDR4 3200Mhz 16 GB * 2
   * Ubuntu 20.04
   
   Giving `local[*]` showed instability on performance so fixed the value to 8. There're not many physical cores so I reduced the number of partitions down to 5 as well.
   
   > plenty of rows in session
   
   ```
   ./bin/spark-submit --master "local[8]" --conf spark.sql.shuffle.partitions=5 --driver-memory 16g --class com.hortonworks.spark.benchmark.streaming.sessionwindow.plenty_of_rows_in_session.BenchmarkSessionWindowListenerWordCountSessionFunctionAppendMode ./iot-trucking-app-spark-structured-streaming-<version>.jar --query-status-file /tmp/a.json --rate-row-per-second 200000 --rate-ramp-up-time-second 10
   ```
   
   [plenty-of-rows-in-session-append-mode-mine-rate-200000-v1.txt](https://github.com/apache/spark/files/6174672/plenty-of-rows-in-session-append-mode-mine-rate-200000-v1.txt)
   
   [plenty-of-rows-in-session-append-mode-PR-31570-rate-200000-v1.txt](https://github.com/apache/spark/files/6174674/plenty-of-rows-in-session-append-mode-PR-31570-rate-200000-v1.txt)
   
   * mine showed 160,000+ on processedRowsPerSecond.
   * PR-31570 didn't reach 60,000 on processedRowsPerSecond.
   
   > plenty of keys
   
   ```
   ./bin/spark-submit --master "local[8]" --conf spark.sql.shuffle.partitions=5 --driver-memory 16g --class com.hortonworks.spark.benchmark.streaming.sessionwindow.plenty_of_keys.BenchmarkSessionWindowListenerWordCountSessionFunctionAppendMode ./iot-trucking-app-spark-structured-streaming-<version>.jar --query-status-file /tmp/b.json --rate-row-per-second 12000000 --rate-ramp-up-time-second 10
   ```
   
   [plenty-of-keys-append-mode-mine-rate-12000000-v1.txt](https://github.com/apache/spark/files/6174671/plenty-of-keys-append-mode-mine-rate-12000000-v1.txt)
   
   [plenty-of-keys-append-mode-PR-31570-rate-12000000-v1.txt](https://github.com/apache/spark/files/6174675/plenty-of-keys-append-mode-PR-31570-rate-12000000-v1.txt)
   
   * mine showed "over" 12,000,000 on processedRowsPerSecond. (Probably could reach more if we increase rate.)
   * PR-31570 didn't reach 10,000,000 on processedRowsPerSecond.
   
   It'd be appreciated if anyone in reviewing can take the chance on performance test on their site and update the result. I'd love to see the result objecting my perf test (either my tests with different env/config or new tests), but if no one proves the result objecting mine, I guess we all know we need to make effort on the right direction.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-803218591


   I did some performance tests I did before, and I observed outstanding difference between revised my PR (my PR + state format used here) vs this PR.
   
   > revised my PR (versioned as 3.2.0-SPARK-10816-heartsavior)
   
   https://github.com/HeartSaVioR/spark/tree/SPARK-10816-heartsavior-rebase-apply-PR-31570-versioned
   
   > this PR (versioned as 3.2.0-PR-31570)
   
   https://github.com/HeartSaVioR/spark/tree/PR-31570-versioned
   
   > benchmark code
   
   https://github.com/HeartSaVioR/iot-trucking-app-spark-structured-streaming/tree/benchmarking-SPARK-10816
   
   I built the benchmark code against locally installed Spark artifacts for both (that said, I built the benchmark code per each).
   
   Simple, change built.sbt, and run `sbt clean assembly`.
   
   > machine to run benchmark
   
   * AMD Ryzen 5600X (no overclock, 3.7 Ghz to 4.6 Ghz, 6 physical cores, 12 logical cores)
   * DDR4 3200Mhz 16 GB * 2
   * Ubuntu 20.04
   
   Giving `local[*]` showed instability on performance so fixed the value to 8. There're not many physical cores so I reduced the number of partitions down to 5 as well.
   
   > plenty of rows in session
   
   ```
   ./bin/spark-submit --master "local[8]" --conf spark.sql.shuffle.partitions=5 --driver-memory 16g --class com.hortonworks.spark.benchmark.streaming.sessionwindow.plenty_of_rows_in_session.BenchmarkSessionWindowListenerWordCountSessionFunctionAppendMode ./iot-trucking-app-spark-structured-streaming-<version>.jar --query-status-file /tmp/a.json --rate-row-per-second 200000 --rate-ramp-up-time-second 10
   ```
   
   [plenty-of-rows-in-session-append-mode-mine-rate-200000-v1.txt](https://github.com/apache/spark/files/6174672/plenty-of-rows-in-session-append-mode-mine-rate-200000-v1.txt)
   
   [plenty-of-rows-in-session-append-mode-PR-31570-rate-200000-v1.txt](https://github.com/apache/spark/files/6174674/plenty-of-rows-in-session-append-mode-PR-31570-rate-200000-v1.txt)
   
   * mine showed 160,000+ on processedRowsPerSecond.
   * PR-31570 didn't reach 60,000 on processedRowsPerSecond.
   
   > plenty of keys
   
   ```
   ./bin/spark-submit --master "local[8]" --conf spark.sql.shuffle.partitions=5 --driver-memory 16g --class com.hortonworks.spark.benchmark.streaming.sessionwindow.plenty_of_keys.BenchmarkSessionWindowListenerWordCountSessionFunctionAppendMode ./iot-trucking-app-spark-structured-streaming-<version>.jar --query-status-file /tmp/b.json --rate-row-per-second 12000000 --rate-ramp-up-time-second 10
   ```
   
   [plenty-of-keys-append-mode-mine-rate-12000000-v1.txt](https://github.com/apache/spark/files/6174671/plenty-of-keys-append-mode-mine-rate-12000000-v1.txt)
   
   [plenty-of-keys-append-mode-PR-31570-rate-12000000-v1.txt](https://github.com/apache/spark/files/6174675/plenty-of-keys-append-mode-PR-31570-rate-12000000-v1.txt)
   
   * mine showed "over" 12,000,000 on processedRowsPerSecond. (Probably could reach more if we increase rate.)
   * PR-31570 didn't reach 10,000,000 on processedRowsPerSecond.
   
   It'd be appreciated if anyone in reviewing can take the chance on performance test on their site and update the result. I'd love to see the result objecting my perf test (either my tests with different env/config or new tests), but if no one proves the result objecting mine, I guess we all know we need to make effort on the right direction.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779996209


   This approach is intuitive, so the change footprint is relatively smaller. It is also easier to maintain. Per performance wise, it in on par with the alternative one.
   
   I optimized it more from original implementation. There are some I think we can work. E.g. isolating state store format to an manager class, so we may change state store format if we need, optimize state store format, dynamic session window gap, etc. I would not do that here now before we reach a consensus if we are all agreed to pursue this approach.
   
   BTW, I may also take time to play with the alternative approach.
   
   cc @dbtsai 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780111186


   **[Test build #135181 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135181/testReport)** for PR 31570 at commit [`e1bb4af`](https://github.com/apache/spark/commit/e1bb4af4c08e6139d7b4d9578327ec7ac83b1623).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780046779


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39761/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r596631016



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SessionWindowExec.scala
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+
+/**
+ * Used for calculating the session window start and end for each row, so this plan requires
+ * child distributed by sessionSpec and sorted by time column in each part. The value for
+ * window start is time value of the first row in this window, the value for window end is
+ * time value of the last row plus the windowGap.
+ *
+ * @param windowExpressions session window expression for the exec node.
+ * @param sessionSpec the partition key of this session window, it is the rest column of
+ *                    groupingExpr in parent aggregate node.
+ * @param windowGap window gap in micro second.
+ * @param child child plan for this node.
+ */
+case class SessionWindowExec(
+    windowExpressions: NamedExpression,
+    timeColumn: Expression,
+    sessionSpec: Seq[Expression],
+    windowGap: Long,
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    ClusteredDistribution(sessionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+    Seq(sessionSpec.map(SortOrder(_, Ascending)) :+ SortOrder(timeColumn, Ascending))
+
+  override def producedAttributes: AttributeSet = AttributeSet(windowExpressions.toAttribute)
+
+  override def output: Seq[Attribute] = child.output ++ Seq(windowExpressions.toAttribute)
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**
+   * Produces the result of the query as an `RDD[InternalRow]`
+   *
+   * Overridden by concrete implementations of SparkPlan.
+   */
+  override protected def doExecute(): RDD[InternalRow] = {

Review comment:
       And if I understand correctly, this needs to copy all input rows because of the buffering. Now I could reload the context of my patch a bit; MergingSessionsIterator only copies the rows which are the first row of session, as it only needs to retain the last session to compare with current input row. That was the reason I chose such complexity. Performance wise, and also there was concerns on JIRA issue about memory usage on the flight.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r596654034



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SessionWindowMergeExec.scala
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+
+/**
+ * The physical plan for streaming query, merge session window after restore from state store.
+ * Note: the end time of window that restore from statestore has already contain session windowGap
+ *
+ * @param windowExpressions
+ * @param sessionSpec
+ * @param child
+ */
+case class SessionWindowMergeExec(
+    windowExpressions: NamedExpression,
+    sessionSpec: Seq[Expression],
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    ClusteredDistribution(sessionSpec) :: Nil
+  }
+
+  // Data should be sorted, so we can merge session window directly.
+  // TODO: use this requirement for simplicity, not necessary to sort the whole dataset,
+  // try better way later.

Review comment:
       If I remember correctly, we still have to sort to handle new event earlier than session stored in state store. "event time processing" means events can be "out of order".




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780140564


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/39762/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-781829637


   **[Test build #135256 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135256/testReport)** for PR 31570 at commit [`fc3d122`](https://github.com/apache/spark/commit/fc3d1224bf2f66dd8c30bc58db1ade37bfdcad1e).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r596631016



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SessionWindowExec.scala
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+
+/**
+ * Used for calculating the session window start and end for each row, so this plan requires
+ * child distributed by sessionSpec and sorted by time column in each part. The value for
+ * window start is time value of the first row in this window, the value for window end is
+ * time value of the last row plus the windowGap.
+ *
+ * @param windowExpressions session window expression for the exec node.
+ * @param sessionSpec the partition key of this session window, it is the rest column of
+ *                    groupingExpr in parent aggregate node.
+ * @param windowGap window gap in micro second.
+ * @param child child plan for this node.
+ */
+case class SessionWindowExec(
+    windowExpressions: NamedExpression,
+    timeColumn: Expression,
+    sessionSpec: Seq[Expression],
+    windowGap: Long,
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    ClusteredDistribution(sessionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+    Seq(sessionSpec.map(SortOrder(_, Ascending)) :+ SortOrder(timeColumn, Ascending))
+
+  override def producedAttributes: AttributeSet = AttributeSet(windowExpressions.toAttribute)
+
+  override def output: Seq[Attribute] = child.output ++ Seq(windowExpressions.toAttribute)
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**
+   * Produces the result of the query as an `RDD[InternalRow]`
+   *
+   * Overridden by concrete implementations of SparkPlan.
+   */
+  override protected def doExecute(): RDD[InternalRow] = {

Review comment:
       And if I understand correctly, this needs to copy all input rows because of the buffering. Now I could reload the context of my patch a bit; MergingSessionsIterator only copy the rows which are the first row of session, as it only needs to retain the last session to compare with current input row. That was the reason I chose such complexity. Performance wise, and also there's concern on JIRA issue about memory usage on the flight.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #31570:
URL: https://github.com/apache/spark/pull/31570#discussion_r596631016



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SessionWindowExec.scala
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning}
+
+/**
+ * Used for calculating the session window start and end for each row, so this plan requires
+ * child distributed by sessionSpec and sorted by time column in each part. The value for
+ * window start is time value of the first row in this window, the value for window end is
+ * time value of the last row plus the windowGap.
+ *
+ * @param windowExpressions session window expression for the exec node.
+ * @param sessionSpec the partition key of this session window, it is the rest column of
+ *                    groupingExpr in parent aggregate node.
+ * @param windowGap window gap in micro second.
+ * @param child child plan for this node.
+ */
+case class SessionWindowExec(
+    windowExpressions: NamedExpression,
+    timeColumn: Expression,
+    sessionSpec: Seq[Expression],
+    windowGap: Long,
+    child: SparkPlan)
+  extends UnaryExecNode {
+
+  override def requiredChildDistribution: Seq[Distribution] = {
+    ClusteredDistribution(sessionSpec) :: Nil
+  }
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+    Seq(sessionSpec.map(SortOrder(_, Ascending)) :+ SortOrder(timeColumn, Ascending))
+
+  override def producedAttributes: AttributeSet = AttributeSet(windowExpressions.toAttribute)
+
+  override def output: Seq[Attribute] = child.output ++ Seq(windowExpressions.toAttribute)
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**
+   * Produces the result of the query as an `RDD[InternalRow]`
+   *
+   * Overridden by concrete implementations of SparkPlan.
+   */
+  override protected def doExecute(): RDD[InternalRow] = {

Review comment:
       And if I understand correctly, this needs to copy all input rows because of the buffering. Now I could reload the context of my patch a bit; MergingSessionsIterator only copy the rows which are the first row of session, as it only needs to retain the last session to compare with current input row.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780813231


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/39782/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [NOT-MERGE][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779568770


   **[Test build #135160 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135160/testReport)** for PR 31570 at commit [`a5efe1b`](https://github.com/apache/spark/commit/a5efe1bab1db7290b5d35f64ac76ed389fb0ab23).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779997222


   I'd like to invite some streaming folks to look at this @HeartSaVioR @xuanyuanking @gaborgsomogyi @zsxwing 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-784702699


   **[Test build #135388 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135388/testReport)** for PR 31570 at commit [`11dc533`](https://github.com/apache/spark/commit/11dc53376bcedb9090a707c313a60e063f03817f).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-780541394


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/135195/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-803912957


   Thanks for re-evaluating two approaches. It is valuable.
   
   Basically by leveraging the new state store format, two previous efforts are now pretty close, except for how they handle session merging.
   
   No worry. The precondition to picking the simpler approach, is that two approaches have similar performance. I remember this was claimed in the JIRA. Re-evaluation gives us a different number.
   
   I ran the benchmark locally. Due to the difference of machines, I cannot get the same numbers but I can see there is significant difference between two approaches, i.e., 1) merging then aggregating, 2) merging with aggregating.
    
   I think we have a few options.
   
   1. Replace with merging with aggregating (`MergingSessionsIterator`). I'm doing it locally to see if we can get a similar number. It'd be good too @HeartSaVioR would like to create PR against this. So it is easier to incorporate authored commits from all parties. It is also fine if @HeartSaVioR wants to work on it after merging this.
   2. Switch to the other previous effort + new state store format.
   
   Either works for me. Actually two options are basically the same logic to me, except for some cosmetic difference.
   
   @xuanyuanking WDYT?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-781876774


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/39836/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya closed pull request #31570: [SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
viirya closed pull request #31570:
URL: https://github.com/apache/spark/pull/31570


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #31570: [WIP][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-784534508


   **[Test build #135388 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/135388/testReport)** for PR 31570 at commit [`11dc533`](https://github.com/apache/spark/commit/11dc53376bcedb9090a707c313a60e063f03817f).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31570: [NOT-MERGE][SPARK-10816][SS] SessionWindow support for Structure Streaming

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31570:
URL: https://github.com/apache/spark/pull/31570#issuecomment-779599239


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/39741/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org