You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by vi...@apache.org on 2021/08/12 03:12:32 UTC

[spark] branch branch-3.2 updated: [SPARK-36480][SS] SessionWindowStateStoreSaveExec should not filter input rows against watermark

This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 9df850d  [SPARK-36480][SS] SessionWindowStateStoreSaveExec should not filter input rows against watermark
9df850d is described below

commit 9df850df9e210372124e579b78e6bfb5aac6ab15
Author: Jungtaek Lim <ka...@gmail.com>
AuthorDate: Wed Aug 11 20:10:59 2021 -0700

    [SPARK-36480][SS] SessionWindowStateStoreSaveExec should not filter input rows against watermark
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to remove the filter applying to input rows against watermark in SessionWindowStateStoreSaveExec, since SessionWindowStateStoreSaveExec is expected to store all inputs into state store, and apply eviction later.
    
    ### Why are the changes needed?
    
    The code is logically not right, though I can't reproduce the actual problem.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests. I can't come up with broken case failing on previous code, but we can review the logic instead.
    
    Closes #33708 from HeartSaVioR/SPARK-36480.
    
    Authored-by: Jungtaek Lim <ka...@gmail.com>
    Signed-off-by: Liang-Chi Hsieh <vi...@gmail.com>
    (cherry picked from commit fac4e5eb3eae4cafe7bd6672911792612c2aaca0)
    Signed-off-by: Liang-Chi Hsieh <vi...@gmail.com>
---
 .../spark/sql/execution/streaming/statefulOperators.scala     | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 0a2d5ad..3431823 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -585,7 +585,12 @@ case class SessionWindowStateStoreRestoreExec(
 }
 
 /**
- * For each input tuple, the key is calculated and the tuple is `put` into the [[StateStore]].
+ * This class replaces existing sessions for the grouping key with new sessions in state store.
+ * All inputs are valid on storing into state store; don't filter out via watermark while storing.
+ * Refer the method doc of [[StreamingSessionWindowStateManager.updateSessions]] for more details.
+ *
+ * This class will provide the output according to the output mode.
+ * Update mode is not supported as the semantic is not feasible for session window.
  */
 case class SessionWindowStateStoreSaveExec(
     keyWithoutSessionExpressions: Seq[Attribute],
@@ -642,9 +647,7 @@ case class SessionWindowStateStoreSaveExec(
         // Assumption: watermark predicates must be non-empty if append mode is allowed
         case Some(Append) =>
           allUpdatesTimeMs += timeTakenMs {
-            val filteredIter = applyRemovingRowsOlderThanWatermark(iter,
-              watermarkPredicateForData.get)
-            putToStore(filteredIter, store)
+            putToStore(iter, store)
           }
 
           val removalStartTimeNs = System.nanoTime

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org