You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2021/07/22 14:37:54 UTC

[spark] branch branch-3.2 updated: [SPARK-35815][SQL] Allow delayThreshold for watermark to be represented as ANSI interval literals

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 3ee9a0d  [SPARK-35815][SQL] Allow delayThreshold for watermark to be represented as ANSI interval literals
3ee9a0d is described below

commit 3ee9a0db3a3eb8e88bbab28207c91bd4637b313a
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Thu Jul 22 17:36:22 2021 +0300

    [SPARK-35815][SQL] Allow delayThreshold for watermark to be represented as ANSI interval literals
    
    ### What changes were proposed in this pull request?
    
    This PR extends the way to represent `delayThreshold` with ANSI interval literals for watermark.
    
    ### Why are the changes needed?
    
    A `delayThreshold` is semantically an interval value so it's should be represented as ANSI interval literals as well as the conventional `1 second` form.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New tests.
    
    Closes #33456 from sarutak/delayThreshold-interval.
    
    Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
    (cherry picked from commit 07fa38e2c1082c2b69b3bf9489cee4dfe4db2c26)
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 .../spark/sql/errors/QueryCompilationErrors.scala  |  2 +-
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 23 ++++++---
 .../sql/streaming/EventTimeWatermarkSuite.scala    | 54 ++++++++++++++++++++++
 3 files changed, 71 insertions(+), 8 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 7a33d52a..f576036 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2265,7 +2265,7 @@ private[spark] object QueryCompilationErrors {
       s"""Cannot resolve column name "$colName" among (${fieldsStr})${extraMsg}""")
   }
 
-  def cannotParseTimeDelayError(delayThreshold: String, e: IllegalArgumentException): Throwable = {
+  def cannotParseTimeDelayError(delayThreshold: String, e: Throwable): Throwable = {
     new AnalysisException(s"Unable to parse time delay '$delayThreshold'", cause = Some(e))
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 0fd10c1..3abc060 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import java.io.{ByteArrayOutputStream, CharArrayWriter, DataOutputStream}
+import java.util.Locale
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, HashSet}
@@ -42,7 +43,7 @@ import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
-import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils}
+import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException, ParserUtils}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection}
@@ -63,7 +64,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.array.ByteArrayMethods
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
 import org.apache.spark.util.Utils
 
 private[sql] object Dataset {
@@ -739,13 +740,21 @@ class Dataset[T] private[sql](
   // We only accept an existing column name, not a derived column here as a watermark that is
   // defined on a derived column cannot referenced elsewhere in the plan.
   def withWatermark(eventTime: String, delayThreshold: String): Dataset[T] = withTypedPlan {
-    val parsedDelay =
-      try {
+    val parsedDelay = try {
+      if (delayThreshold.toLowerCase(Locale.ROOT).trim.startsWith("interval")) {
+        CatalystSqlParser.parseExpression(delayThreshold) match {
+          case Literal(months: Int, _: YearMonthIntervalType) =>
+            new CalendarInterval(months, 0, 0)
+          case Literal(micros: Long, _: DayTimeIntervalType) =>
+            new CalendarInterval(0, 0, micros)
+        }
+      } else {
         IntervalUtils.stringToInterval(UTF8String.fromString(delayThreshold))
-      } catch {
-        case e: IllegalArgumentException =>
-          throw QueryCompilationErrors.cannotParseTimeDelayError(delayThreshold, e)
       }
+    } catch {
+      case NonFatal(e) =>
+        throw QueryCompilationErrors.cannotParseTimeDelayError(delayThreshold, e)
+    }
     require(!IntervalUtils.isNegative(parsedDelay),
       s"delay threshold ($delayThreshold) should not be negative.")
     EliminateEventTimeWatermark(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 67ab72a..2724153 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -31,6 +31,7 @@ import org.scalatest.matchers.should.Matchers._
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, Dataset}
 import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
+import org.apache.spark.sql.catalyst.util.DateTimeConstants._
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.UTC
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.sources.MemorySink
@@ -765,6 +766,59 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
     }
   }
 
+  test("SPARK-35815: Support ANSI intervals for delay threshold") {
+    val DAYS_PER_MONTH = 31
+    Seq(
+      // Conventional form and some variants
+      (Seq("3 days", "Interval 3 day", "inTerval '3' day"), 3 * MILLIS_PER_DAY),
+      (Seq(" 5 hours", "INTERVAL 5 hour", "interval '5' hour"), 5 * MILLIS_PER_HOUR),
+      (Seq("\t8 minutes", "interval 8 minute", "interval '8' minute"), 8 * MILLIS_PER_MINUTE),
+      (Seq("10 seconds", "interval 10 second", "interval '10' second"), 10 * MILLIS_PER_SECOND),
+      (Seq("1 years", "interval 1 year", "interval '1' year"),
+        MONTHS_PER_YEAR * DAYS_PER_MONTH * MILLIS_PER_DAY),
+      (Seq("1 months", "interval 1 month", "interval '1' month"), DAYS_PER_MONTH * MILLIS_PER_DAY),
+      (Seq(
+        "1 day 2 hours 3 minutes 4 seconds",
+        " interval 1 day 2 hours 3 minutes 4 seconds",
+        "\tinterval '1' day '2' hours '3' minutes '4' seconds",
+        "interval '1 2:3:4' day to second"),
+        MILLIS_PER_DAY + 2 * MILLIS_PER_HOUR + 3 * MILLIS_PER_MINUTE + 4 * MILLIS_PER_SECOND),
+      (Seq(
+        " 1 year 2 months",
+        "interval 1 year 2 month",
+        "interval '1' year '2' month",
+        "\tinterval '1-2' year to month"),
+        (MONTHS_PER_YEAR * DAYS_PER_MONTH + 2 * DAYS_PER_MONTH) * MILLIS_PER_DAY)
+    ).foreach { case (delayThresholdVariants, expectedMs) =>
+      delayThresholdVariants.foreach { case delayThreshold =>
+        val df = MemoryStream[Int].toDF
+          .withColumn("eventTime", timestamp_seconds($"value"))
+          .withWatermark("eventTime", delayThreshold)
+        val eventTimeAttr = df.queryExecution.analyzed.output.find(a => a.name == "eventTime")
+        assert(eventTimeAttr.isDefined)
+        val metadata = eventTimeAttr.get.metadata
+        assert(metadata.contains(EventTimeWatermark.delayKey))
+        assert(metadata.getLong(EventTimeWatermark.delayKey) === expectedMs)
+      }
+    }
+
+    // Invalid interval patterns
+    Seq(
+      "1 foo",
+      "interva 2 day",
+      "intrval '3' day",
+      "interval 4 foo",
+      "interval '5' foo",
+      "interval '1 2:3:4' day to hour",
+      "interval '1 2' year to month").foreach { delayThreshold =>
+      intercept[AnalysisException] {
+        val df = MemoryStream[Int].toDF
+          .withColumn("eventTime", timestamp_seconds($"value"))
+          .withWatermark("eventTime", delayThreshold)
+      }
+    }
+  }
+
   private def dfWithMultipleWatermarks(
       input1: MemoryStream[Int],
       input2: MemoryStream[Int]): Dataset[_] = {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org