You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/01/25 23:33:51 UTC

[spark] branch branch-2.4 updated: [SPARK-26379][SS] Fix issue on adding current_timestamp/current_date to streaming query

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 46a9018  [SPARK-26379][SS] Fix issue on adding current_timestamp/current_date to streaming query
46a9018 is described below

commit 46a9018b3b29c36f33e4113984a7f43f91ac12fc
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Fri Jan 25 14:58:03 2019 -0800

    [SPARK-26379][SS] Fix issue on adding current_timestamp/current_date to streaming query
    
    ## What changes were proposed in this pull request?
    
    This patch proposes to fix issue on adding `current_timestamp` / `current_date` with streaming query.
    
    The root reason is that Spark transforms `CurrentTimestamp`/`CurrentDate` to `CurrentBatchTimestamp` in MicroBatchExecution which makes transformed attributes not-yet-resolved. They will be resolved by IncrementalExecution.
    (In ContinuousExecution, Spark doesn't allow using `current_timestamp` and `current_date` so it has been OK.)
    
    It's OK for DataSource V1 sink because it simply leverages transformed logical plan and don't evaluate until they're resolved, but for DataSource V2 sink, Spark tries to extract the schema of transformed logical plan in prior to IncrementalExecution, and unresolved attributes will raise errors.
    
    This patch fixes the issue via having separate pre-resolved logical plan to pass the schema to StreamingWriteSupport safely.
    
    ## How was this patch tested?
    
    Added UT.
    
    Closes #23609 from HeartSaVioR/SPARK-26379.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../execution/streaming/MicroBatchExecution.scala  | 10 ++++-
 .../apache/spark/sql/streaming/StreamSuite.scala   | 46 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 7a007b6..e58182c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -496,12 +496,20 @@ class MicroBatchExecution(
           cd.dataType, cd.timeZoneId)
     }
 
+    // Pre-resolve new attributes to ensure all attributes are resolved before
+    // accessing schema of logical plan. Note that it only leverages the information
+    // of attributes, so we don't need to concern about the value of literals.
+
+    val newAttrPlanPreResolvedForSchema = newAttributePlan transformAllExpressions {
+      case cbt: CurrentBatchTimestamp => cbt.toLiteral
+    }
+
     val triggerLogicalPlan = sink match {
       case _: Sink => newAttributePlan
       case s: StreamWriteSupport =>
         val writer = s.createStreamWriter(
           s"$runId",
-          newAttributePlan.schema,
+          newAttrPlanPreResolvedForSchema.schema,
           outputMode,
           new DataSourceOptions(extraOptions.asJava))
         WriteToDataSourceV2(new MicroBatchWriter(currentBatchId, writer), newAttributePlan)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index f55ddb5..766bee6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.Range
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
@@ -1082,6 +1083,51 @@ class StreamSuite extends StreamTest {
       assert(query.exception.isEmpty)
     }
   }
+
+  test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" +
+    " to Dataset - use v2 sink") {
+    testCurrentTimestampOnStreamingQuery(useV2Sink = true)
+  }
+
+  test("SPARK-26379 Structured Streaming - Exception on adding current_timestamp / current_date" +
+    " to Dataset - use v1 sink") {
+    testCurrentTimestampOnStreamingQuery(useV2Sink = false)
+  }
+
+  private def testCurrentTimestampOnStreamingQuery(useV2Sink: Boolean): Unit = {
+    val input = MemoryStream[Int]
+    val df = input.toDS()
+      .withColumn("cur_timestamp", lit(current_timestamp()))
+      .withColumn("cur_date", lit(current_date()))
+
+    def assertBatchOutputAndUpdateLastTimestamp(
+        rows: Seq[Row],
+        curTimestamp: Long,
+        curDate: Int,
+        expectedValue: Int): Long = {
+      assert(rows.size === 1)
+      val row = rows.head
+      assert(row.getInt(0) === expectedValue)
+      assert(row.getTimestamp(1).getTime >= curTimestamp)
+      val days = DateTimeUtils.millisToDays(row.getDate(2).getTime)
+      assert(days == curDate || days == curDate + 1)
+      row.getTimestamp(1).getTime
+    }
+
+    var lastTimestamp = System.currentTimeMillis()
+    val currentDate = DateTimeUtils.millisToDays(lastTimestamp)
+    testStream(df, useV2Sink = useV2Sink) (
+      AddData(input, 1),
+      CheckLastBatch { rows: Seq[Row] =>
+        lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, currentDate, 1)
+      },
+      Execute { _ => Thread.sleep(1000) },
+      AddData(input, 2),
+      CheckLastBatch { rows: Seq[Row] =>
+        lastTimestamp = assertBatchOutputAndUpdateLastTimestamp(rows, lastTimestamp, currentDate, 2)
+      }
+    )
+  }
 }
 
 abstract class FakeSource extends StreamSourceProvider {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org