You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2022/03/30 03:02:51 UTC
[spark] branch master updated: [SPARK-38349][SS] No need to filter events when sessionwindow gapDuration greater than 0
This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a445536 [SPARK-38349][SS] No need to filter events when sessionwindow gapDuration greater than 0
a445536 is described below
commit a44553648f75e9243b8a7dc27185ae6901f35f94
Author: nyingping <sm...@163.com>
AuthorDate: Wed Mar 30 12:00:40 2022 +0900
[SPARK-38349][SS] No need to filter events when sessionwindow gapDuration greater than 0
### What changes were proposed in this pull request?
Static gapDuration on session Window,No need to filter events when sessionwindow gapDuration greater than 0.
### Why are the changes needed?
save calculation resources and improve performance.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
add new UT and benchmark.
a simple benchmark on [[9dae8a5](https://github.com/nyingping/spark/commit/9dae8a555e82a59d2160bbb4518704cec81b219e)] . thanks again [HeartSaVioRd532b6f](https://github.com/HeartSaVioR/spark/commit/d532b6f6bcdd80cdaac520b21587ebb69ff2df8f).
---------------------------------------
case 1
---------------------------------------
```
spark.range(numOfRow)
.selectExpr("CAST(id AS timestamp) AS time")
.select(session_window(col("time"), "10 seconds"))
.count()
```
Result:
```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_291-b10 on Windows 10 10.0
AMD64 Family 23 Model 96 Stepping 1, AuthenticAMD
session windows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
old logic 1688 1730 61 5.9 168.8 1.0X
new logic 21 26 5 487.3 2.1 82.3X
```
Closes #35680 from nyingping/SPARK-38349.
Lead-authored-by: nyingping <sm...@163.com>
Co-authored-by: Nie yingping <sm...@163.com>
Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 16 +++++--
.../spark/sql/DataFrameSessionWindowingSuite.scala | 56 +++++++++++++++++++++-
2 files changed, 67 insertions(+), 5 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 6d95067..f69f17d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -57,7 +57,7 @@ import org.apache.spark.sql.internal.connector.V1Function
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DayTimeIntervalType.DAY
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils}
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
import org.apache.spark.util.Utils
/**
@@ -4058,10 +4058,20 @@ object SessionWindowing extends Rule[LogicalPlan] {
case s: SessionWindow => sessionAttr
}
+ val filterByTimeRange = session.gapDuration match {
+ case Literal(interval: CalendarInterval, CalendarIntervalType) =>
+ interval == null || interval.months + interval.days + interval.microseconds <= 0
+ case _ => true
+ }
+
// As same as tumbling window, we add a filter to filter out nulls.
// And we also filter out events with negative or zero or invalid gap duration.
- val filterExpr = IsNotNull(session.timeColumn) &&
- (sessionAttr.getField(SESSION_END) > sessionAttr.getField(SESSION_START))
+ val filterExpr = if (filterByTimeRange) {
+ IsNotNull(session.timeColumn) &&
+ (sessionAttr.getField(SESSION_END) > sessionAttr.getField(SESSION_START))
+ } else {
+ IsNotNull(session.timeColumn)
+ }
replacedPlan.withNewChildren(
Filter(filterExpr,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSessionWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSessionWindowingSuite.scala
index a5414f3..4c2d0f5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSessionWindowingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSessionWindowingSuite.scala
@@ -22,8 +22,8 @@ import java.time.LocalDateTime
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterThan}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, Filter, LogicalPlan, Project}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
@@ -495,4 +495,56 @@ class DataFrameSessionWindowingSuite extends QueryTest with SharedSparkSession
validateWindowColumnInSchema(schema2, "session")
}
}
+
+ test("SPARK-38349: No need to filter events when gapDuration greater than 0") {
+ // negative gap duration
+ check("-5 seconds", true, "Need to filter events when gap duration less than 0")
+
+ // positive gap duration
+ check("5 seconds", false, "No need to filter events when gap duration greater than 0")
+
+ // invalid gap duration
+ check("x seconds", true, "Need to filter events when gap duration invalid")
+
+ // dynamic gap duration
+ check(when(col("time").equalTo("1"), "5 seconds")
+ .when(col("time").equalTo("2"), "10 seconds")
+ .otherwise("10 seconds"), true, "Need to filter events when gap duration dynamically")
+
+ def check(
+ gapDuration: Any,
+ expectTimeRange: Boolean,
+ assertHintMsg: String): Unit = {
+ val data = Seq(
+ ("2016-03-27 19:39:30", 1, "a")).toDF("time", "value", "id")
+ val df = if (gapDuration.isInstanceOf[String]) {
+ data.groupBy(session_window($"time", gapDuration.asInstanceOf[String]))
+ } else {
+ data.groupBy(session_window($"time", gapDuration.asInstanceOf[Column]))
+ }
+ val aggregate = df.agg(count("*").as("counts"))
+ .select($"session_window.start".cast("string"), $"session_window.end".cast("string"),
+ $"counts")
+
+ checkFilterCondition(aggregate.queryExecution.logical, expectTimeRange, assertHintMsg)
+ }
+
+ def checkFilterCondition(
+ logicalPlan: LogicalPlan,
+ expectTimeRange: Boolean,
+ assertHintMsg: String): Unit = {
+ val filter = logicalPlan.find { plan =>
+ plan.isInstanceOf[Filter] && plan.children.head.isInstanceOf[Project]
+ }
+ assert(filter.isDefined)
+ val exist = filter.get.expressions.flatMap { expr =>
+ expr.collect { case gt: GreaterThan => gt }
+ }
+ if (expectTimeRange) {
+ assert(exist.nonEmpty, assertHintMsg)
+ } else {
+ assert(exist.isEmpty, assertHintMsg)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org