You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2021/07/22 14:37:54 UTC
[spark] branch branch-3.2 updated: [SPARK-35815][SQL] Allow
delayThreshold for watermark to be represented as ANSI interval literals
This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 3ee9a0d [SPARK-35815][SQL] Allow delayThreshold for watermark to be represented as ANSI interval literals
3ee9a0d is described below
commit 3ee9a0db3a3eb8e88bbab28207c91bd4637b313a
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Thu Jul 22 17:36:22 2021 +0300
[SPARK-35815][SQL] Allow delayThreshold for watermark to be represented as ANSI interval literals
### What changes were proposed in this pull request?
This PR extends the way to represent `delayThreshold` with ANSI interval literals for watermark.
### Why are the changes needed?
A `delayThreshold` is semantically an interval value so it's should be represented as ANSI interval literals as well as the conventional `1 second` form.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New tests.
Closes #33456 from sarutak/delayThreshold-interval.
Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
Signed-off-by: Max Gekk <ma...@gmail.com>
(cherry picked from commit 07fa38e2c1082c2b69b3bf9489cee4dfe4db2c26)
Signed-off-by: Max Gekk <ma...@gmail.com>
---
.../spark/sql/errors/QueryCompilationErrors.scala | 2 +-
.../main/scala/org/apache/spark/sql/Dataset.scala | 23 ++++++---
.../sql/streaming/EventTimeWatermarkSuite.scala | 54 ++++++++++++++++++++++
3 files changed, 71 insertions(+), 8 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 7a33d52a..f576036 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2265,7 +2265,7 @@ private[spark] object QueryCompilationErrors {
s"""Cannot resolve column name "$colName" among (${fieldsStr})${extraMsg}""")
}
- def cannotParseTimeDelayError(delayThreshold: String, e: IllegalArgumentException): Throwable = {
+ def cannotParseTimeDelayError(delayThreshold: String, e: Throwable): Throwable = {
new AnalysisException(s"Unable to parse time delay '$delayThreshold'", cause = Some(e))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 0fd10c1..3abc060 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql
import java.io.{ByteArrayOutputStream, CharArrayWriter, DataOutputStream}
+import java.util.Locale
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashSet}
@@ -42,7 +43,7 @@ import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
import org.apache.spark.sql.catalyst.optimizer.CombineUnions
-import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils}
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException, ParserUtils}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection}
@@ -63,7 +64,7 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.array.ByteArrayMethods
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
import org.apache.spark.util.Utils
private[sql] object Dataset {
@@ -739,13 +740,21 @@ class Dataset[T] private[sql](
// We only accept an existing column name, not a derived column here as a watermark that is
// defined on a derived column cannot referenced elsewhere in the plan.
def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withTypedPlan {
- val parsedDelay =
- try {
+ val parsedDelay = try {
+ if (delayThreshold.toLowerCase(Locale.ROOT).trim.startsWith("interval")) {
+ CatalystSqlParser.parseExpression(delayThreshold) match {
+ case Literal(months: Int, _: YearMonthIntervalType) =>
+ new CalendarInterval(months, 0, 0)
+ case Literal(micros: Long, _: DayTimeIntervalType) =>
+ new CalendarInterval(0, 0, micros)
+ }
+ } else {
IntervalUtils.stringToInterval(UTF8String.fromString(delayThreshold))
- } catch {
- case e: IllegalArgumentException =>
- throw QueryCompilationErrors.cannotParseTimeDelayError(delayThreshold, e)
}
+ } catch {
+ case NonFatal(e) =>
+ throw QueryCompilationErrors.cannotParseTimeDelayError(delayThreshold, e)
+ }
require(!IntervalUtils.isNegative(parsedDelay),
s"delay threshold ($delayThreshold) should not be negative.")
EliminateEventTimeWatermark(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 67ab72a..2724153 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -31,6 +31,7 @@ import org.scalatest.matchers.should.Matchers._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Dataset}
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
+import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.UTC
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.MemorySink
@@ -765,6 +766,59 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
}
}
+ test("SPARK-35815: Support ANSI intervals for delay threshold") {
+ val DAYS_PER_MONTH = 31
+ Seq(
+ // Conventional form and some variants
+ (Seq("3 days", "Interval 3 day", "inTerval '3' day"), 3 * MILLIS_PER_DAY),
+ (Seq(" 5 hours", "INTERVAL 5 hour", "interval '5' hour"), 5 * MILLIS_PER_HOUR),
+ (Seq("\t8 minutes", "interval 8 minute", "interval '8' minute"), 8 * MILLIS_PER_MINUTE),
+ (Seq("10 seconds", "interval 10 second", "interval '10' second"), 10 * MILLIS_PER_SECOND),
+ (Seq("1 years", "interval 1 year", "interval '1' year"),
+ MONTHS_PER_YEAR * DAYS_PER_MONTH * MILLIS_PER_DAY),
+ (Seq("1 months", "interval 1 month", "interval '1' month"), DAYS_PER_MONTH * MILLIS_PER_DAY),
+ (Seq(
+ "1 day 2 hours 3 minutes 4 seconds",
+ " interval 1 day 2 hours 3 minutes 4 seconds",
+ "\tinterval '1' day '2' hours '3' minutes '4' seconds",
+ "interval '1 2:3:4' day to second"),
+ MILLIS_PER_DAY + 2 * MILLIS_PER_HOUR + 3 * MILLIS_PER_MINUTE + 4 * MILLIS_PER_SECOND),
+ (Seq(
+ " 1 year 2 months",
+ "interval 1 year 2 month",
+ "interval '1' year '2' month",
+ "\tinterval '1-2' year to month"),
+ (MONTHS_PER_YEAR * DAYS_PER_MONTH + 2 * DAYS_PER_MONTH) * MILLIS_PER_DAY)
+ ).foreach { case (delayThresholdVariants, expectedMs) =>
+ delayThresholdVariants.foreach { case delayThreshold =>
+ val df = MemoryStream[Int].toDF
+ .withColumn("eventTime", timestamp_seconds($"value"))
+ .withWatermark("eventTime", delayThreshold)
+ val eventTimeAttr = df.queryExecution.analyzed.output.find(a => a.name == "eventTime")
+ assert(eventTimeAttr.isDefined)
+ val metadata = eventTimeAttr.get.metadata
+ assert(metadata.contains(EventTimeWatermark.delayKey))
+ assert(metadata.getLong(EventTimeWatermark.delayKey) === expectedMs)
+ }
+ }
+
+ // Invalid interval patterns
+ Seq(
+ "1 foo",
+ "interva 2 day",
+ "intrval '3' day",
+ "interval 4 foo",
+ "interval '5' foo",
+ "interval '1 2:3:4' day to hour",
+ "interval '1 2' year to month").foreach { delayThreshold =>
+ intercept[AnalysisException] {
+ val df = MemoryStream[Int].toDF
+ .withColumn("eventTime", timestamp_seconds($"value"))
+ .withWatermark("eventTime", delayThreshold)
+ }
+ }
+ }
+
private def dfWithMultipleWatermarks(
input1: MemoryStream[Int],
input2: MemoryStream[Int]): Dataset[_] = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org