You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/01/28 20:14:20 UTC

[spark] branch branch-2.3 updated: [SPARK-26379][SS][BRANCH-2.3] Use dummy TimeZoneId to avoid UnresolvedException in CurrentBatchTimestamp

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new a89f601  [SPARK-26379][SS][BRANCH-2.3] Use dummy TimeZoneId to avoid UnresolvedException in CurrentBatchTimestamp
a89f601 is described below

commit a89f601a788ab6f1c89cefb1b4097444fb9847a4
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Mon Jan 28 12:13:51 2019 -0800

    [SPARK-26379][SS][BRANCH-2.3] Use dummy TimeZoneId to avoid UnresolvedException in CurrentBatchTimestamp
    
    ## What changes were proposed in this pull request?
    
    Spark replaces `CurrentTimestamp` with `CurrentBatchTimestamp`.
    However, `CurrentBatchTimestamp` is `TimeZoneAwareExpression` while `CurrentTimestamp` isn't.
    Without TimeZoneId, `CurrentBatchTimestamp` becomes unresolved and raises `UnresolvedException`.
    
    Since `CurrentDate` is `TimeZoneAwareExpression`, there is no problem with `CurrentDate`.
    
    ## How was this patch tested?
    
    Pass the Jenkins with the updated test cases.
    
    Closes #23656 from HeartSaVioR/SPARK-26379-branch-2.3.
    
    Lead-authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
    Co-authored-by: Dongjoon Hyun <do...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../execution/streaming/MicroBatchExecution.scala  |  5 ++-
 .../apache/spark/sql/streaming/StreamSuite.scala   | 42 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 6a264ad..dbbb7e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -430,8 +430,11 @@ class MicroBatchExecution(
     // Rewire the plan to use the new attributes that were returned by the source.
     val newAttributePlan = newBatchesPlan transformAllExpressions {
       case ct: CurrentTimestamp =>
+        // CurrentTimestamp is not TimeZoneAwareExpression while CurrentBatchTimestamp is.
+        // Without TimeZoneId, CurrentBatchTimestamp is unresolved. Here, we use an explicit
+        // dummy string to prevent UnresolvedException and to prevent to be used in the future.
         CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
-          ct.dataType)
+          ct.dataType, Some("Dummy TimeZoneId"))
       case cd: CurrentDate =>
         CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
           cd.dataType, cd.timeZoneId)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index c65e5d3..92fdde8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -33,6 +33,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.Range
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProvider}
@@ -825,6 +826,47 @@ class StreamSuite extends StreamTest {
       assert(query.exception.isEmpty)
     }
   }
+
+  test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp " +
+    " to Dataset - use v2 sink") {
+    testCurrentTimestampOnStreamingQuery(useV2Sink = true)
+  }
+
+  test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp " +
+    " to Dataset - use v1 sink") {
+    testCurrentTimestampOnStreamingQuery(useV2Sink = false)
+  }
+
+  private def testCurrentTimestampOnStreamingQuery(useV2Sink: Boolean): Unit = {
+    val input = MemoryStream[Int]
+    val df = input.toDS().withColumn("cur_timestamp", lit(current_timestamp()))
+
+    def assertBatchOutputAndUpdateLastTimestamp(
+        rows: Seq[Row],
+        curTimestamp: Long,
+        curDate: Int,
+        expectedValue: Int): Long = {
+      assert(rows.size === 1)
+      val row = rows.head
+      assert(row.getInt(0) === expectedValue)
+      assert(row.getTimestamp(1).getTime >= curTimestamp)
+      row.getTimestamp(1).getTime
+    }
+
+    var lastTimestamp = System.currentTimeMillis()
+    val currentDate = DateTimeUtils.millisToDays(lastTimestamp)
+    testStream(df, useV2Sink = useV2Sink) (
+      AddData(input, 1),
+      CheckLastBatch { rows: Seq[Row] =>
+        lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, currentDate, 1)
+      },
+      Execute { _ => Thread.sleep(1000) },
+      AddData(input, 2),
+      CheckLastBatch { rows: Seq[Row] =>
+        lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, currentDate, 2)
+      }
+    )
+  }
 }
 
 abstract class FakeSource extends StreamSourceProvider {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org