You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2022/03/30 03:02:51 UTC

[spark] branch master updated: [SPARK-38349][SS] No need to filter events when sessionwindow gapDuration greater than 0

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a445536  [SPARK-38349][SS] No need to filter events when sessionwindow gapDuration greater than 0
a445536 is described below

commit a44553648f75e9243b8a7dc27185ae6901f35f94
Author: nyingping <sm...@163.com>
AuthorDate: Wed Mar 30 12:00:40 2022 +0900

    [SPARK-38349][SS] No need to filter events when sessionwindow gapDuration greater than 0
    
    ### What changes were proposed in this pull request?
    
    Static gapDuration on session Window,No need to filter events when sessionwindow gapDuration greater than 0.
    
    ### Why are the changes needed?
    
    save calculation resources and improve performance.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    add new UT and benchmark.
    
    a simple benchmark on [[9dae8a5](https://github.com/nyingping/spark/commit/9dae8a555e82a59d2160bbb4518704cec81b219e)] . thanks again  [HeartSaVioRd532b6f](https://github.com/HeartSaVioR/spark/commit/d532b6f6bcdd80cdaac520b21587ebb69ff2df8f).
    
    ---------------------------------------
    case 1
    ---------------------------------------
    ```
    spark.range(numOfRow)
          .selectExpr("CAST(id AS timestamp) AS time")
          .select(session_window(col("time"), "10 seconds"))
          .count()
    ```
    
    Result:
    
    ```
    Java HotSpot(TM) 64-Bit Server VM 1.8.0_291-b10 on Windows 10 10.0
    AMD64 Family 23 Model 96 Stepping 1, AuthenticAMD
    session windows:                          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------------------------------
    old logic                                          1688           1730          61          5.9         168.8       1.0X
    new logic                                            21             26           5        487.3           2.1      82.3X
    ```
    
    Closes #35680 from nyingping/SPARK-38349.
    
    Lead-authored-by: nyingping <sm...@163.com>
    Co-authored-by: Nie yingping <sm...@163.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 16 +++++--
 .../spark/sql/DataFrameSessionWindowingSuite.scala | 56 +++++++++++++++++++++-
 2 files changed, 67 insertions(+), 5 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 6d95067..f69f17d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -57,7 +57,7 @@ import org.apache.spark.sql.internal.connector.V1Function
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.types.DayTimeIntervalType.DAY
 import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils}
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 import org.apache.spark.util.Utils
 
 /**
@@ -4058,10 +4058,20 @@ object SessionWindowing extends Rule[LogicalPlan] {
           case s: SessionWindow => sessionAttr
         }
 
+        val filterByTimeRange = session.gapDuration match {
+          case Literal(interval: CalendarInterval, CalendarIntervalType) =>
+            interval == null || interval.months + interval.days + interval.microseconds <= 0
+          case _ => true
+        }
+
         // As same as tumbling window, we add a filter to filter out nulls.
         // And we also filter out events with negative or zero or invalid gap duration.
-        val filterExpr = IsNotNull(session.timeColumn) &&
-          (sessionAttr.getField(SESSION_END) > sessionAttr.getField(SESSION_START))
+        val filterExpr = if (filterByTimeRange) {
+          IsNotNull(session.timeColumn) &&
+            (sessionAttr.getField(SESSION_END) > sessionAttr.getField(SESSION_START))
+        } else {
+          IsNotNull(session.timeColumn)
+        }
 
         replacedPlan.withNewChildren(
           Filter(filterExpr,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSessionWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSessionWindowingSuite.scala
index a5414f3..4c2d0f5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSessionWindowingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSessionWindowingSuite.scala
@@ -22,8 +22,8 @@ import java.time.LocalDateTime
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterThan}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, Filter, LogicalPlan, Project}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
@@ -495,4 +495,56 @@ class DataFrameSessionWindowingSuite extends QueryTest with SharedSparkSession
       validateWindowColumnInSchema(schema2, "session")
     }
   }
+
+  test("SPARK-38349: No need to filter events when gapDuration greater than 0") {
+    // negative gap duration
+    check("-5 seconds", true, "Need to filter events when gap duration less than 0")
+
+    // positive gap duration
+    check("5 seconds", false, "No need to filter events when gap duration greater than 0")
+
+    // invalid gap duration
+    check("x seconds", true, "Need to filter events when gap duration invalid")
+
+    // dynamic gap duration
+    check(when(col("time").equalTo("1"), "5 seconds")
+      .when(col("time").equalTo("2"), "10 seconds")
+      .otherwise("10 seconds"), true, "Need to filter events when gap duration dynamically")
+
+    def check(
+        gapDuration: Any,
+        expectTimeRange: Boolean,
+        assertHintMsg: String): Unit = {
+      val data = Seq(
+        ("2016-03-27 19:39:30", 1, "a")).toDF("time", "value", "id")
+      val df = if (gapDuration.isInstanceOf[String]) {
+        data.groupBy(session_window($"time", gapDuration.asInstanceOf[String]))
+      } else {
+        data.groupBy(session_window($"time", gapDuration.asInstanceOf[Column]))
+      }
+      val aggregate = df.agg(count("*").as("counts"))
+        .select($"session_window.start".cast("string"), $"session_window.end".cast("string"),
+          $"counts")
+
+      checkFilterCondition(aggregate.queryExecution.logical, expectTimeRange, assertHintMsg)
+    }
+
+    def checkFilterCondition(
+        logicalPlan: LogicalPlan,
+        expectTimeRange: Boolean,
+        assertHintMsg: String): Unit = {
+      val filter = logicalPlan.find { plan =>
+        plan.isInstanceOf[Filter] && plan.children.head.isInstanceOf[Project]
+      }
+      assert(filter.isDefined)
+      val exist = filter.get.expressions.flatMap { expr =>
+        expr.collect { case gt: GreaterThan => gt }
+      }
+      if (expectTimeRange) {
+        assert(exist.nonEmpty, assertHintMsg)
+      } else {
+        assert(exist.isEmpty, assertHintMsg)
+      }
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org