You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/01/28 20:14:20 UTC
[spark] branch branch-2.3 updated: [SPARK-26379][SS][BRANCH-2.3]
Use dummy TimeZoneId to avoid UnresolvedException in CurrentBatchTimestamp
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new a89f601 [SPARK-26379][SS][BRANCH-2.3] Use dummy TimeZoneId to avoid UnresolvedException in CurrentBatchTimestamp
a89f601 is described below
commit a89f601a788ab6f1c89cefb1b4097444fb9847a4
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Mon Jan 28 12:13:51 2019 -0800
[SPARK-26379][SS][BRANCH-2.3] Use dummy TimeZoneId to avoid UnresolvedException in CurrentBatchTimestamp
## What changes were proposed in this pull request?
Spark replaces `CurrentTimestamp` with `CurrentBatchTimestamp`.
However, `CurrentBatchTimestamp` is `TimeZoneAwareExpression` while `CurrentTimestamp` isn't.
Without TimeZoneId, `CurrentBatchTimestamp` becomes unresolved and raises `UnresolvedException`.
Since `CurrentDate` is `TimeZoneAwareExpression`, there is no problem with `CurrentDate`.
## How was this patch tested?
Pass the Jenkins with the updated test cases.
Closes #23656 from HeartSaVioR/SPARK-26379-branch-2.3.
Lead-authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
Co-authored-by: Dongjoon Hyun <do...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../execution/streaming/MicroBatchExecution.scala | 5 ++-
.../apache/spark/sql/streaming/StreamSuite.scala | 42 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 6a264ad..dbbb7e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -430,8 +430,11 @@ class MicroBatchExecution(
// Rewire the plan to use the new attributes that were returned by the source.
val newAttributePlan = newBatchesPlan transformAllExpressions {
case ct: CurrentTimestamp =>
+ // CurrentTimestamp is not TimeZoneAwareExpression while CurrentBatchTimestamp is.
+ // Without TimeZoneId, CurrentBatchTimestamp is unresolved. Here, we use an explicit
+ // dummy string to prevent UnresolvedException and to prevent to be used in the future.
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
- ct.dataType)
+ ct.dataType, Some("Dummy TimeZoneId"))
case cd: CurrentDate =>
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
cd.dataType, cd.timeZoneId)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index c65e5d3..92fdde8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -33,6 +33,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreId, StateStoreProvider}
@@ -825,6 +826,47 @@ class StreamSuite extends StreamTest {
assert(query.exception.isEmpty)
}
}
+
+ test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp " +
+ " to Dataset - use v2 sink") {
+ testCurrentTimestampOnStreamingQuery(useV2Sink = true)
+ }
+
+ test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp " +
+ " to Dataset - use v1 sink") {
+ testCurrentTimestampOnStreamingQuery(useV2Sink = false)
+ }
+
+ private def testCurrentTimestampOnStreamingQuery(useV2Sink: Boolean): Unit = {
+ val input = MemoryStream[Int]
+ val df = input.toDS().withColumn("cur_timestamp", lit(current_timestamp()))
+
+ def assertBatchOutputAndUpdateLastTimestamp(
+ rows: Seq[Row],
+ curTimestamp: Long,
+ curDate: Int,
+ expectedValue: Int): Long = {
+ assert(rows.size === 1)
+ val row = rows.head
+ assert(row.getInt(0) === expectedValue)
+ assert(row.getTimestamp(1).getTime >= curTimestamp)
+ row.getTimestamp(1).getTime
+ }
+
+ var lastTimestamp = System.currentTimeMillis()
+ val currentDate = DateTimeUtils.millisToDays(lastTimestamp)
+ testStream(df, useV2Sink = useV2Sink) (
+ AddData(input, 1),
+ CheckLastBatch { rows: Seq[Row] =>
+ lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, currentDate, 1)
+ },
+ Execute { _ => Thread.sleep(1000) },
+ AddData(input, 2),
+ CheckLastBatch { rows: Seq[Row] =>
+ lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, currentDate, 2)
+ }
+ )
+ }
}
abstract class FakeSource extends StreamSourceProvider {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org