You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/04/09 02:14:02 UTC

[spark] branch master updated: [SPARK-25496][SQL][FOLLOWUP] avoid using to_utc_timestamp

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 051336d  [SPARK-25496][SQL][FOLLOWUP] avoid using to_utc_timestamp
051336d is described below

commit 051336d9dd1000df82cae5fa3c1aa6b4559d9fb7
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Apr 9 10:13:38 2019 +0800

    [SPARK-25496][SQL][FOLLOWUP] avoid using to_utc_timestamp
    
    ## What changes were proposed in this pull request?
    
    in https://github.com/apache/spark/pull/24195 , we deprecate `from/to_utc_timestamp`.
    
    This PR removes unnecessary use of `to_utc_timestamp` in the test.
    
    ## How was this patch tested?
    
    test only PR
    
    Closes #24319 from cloud-fan/minor.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/streaming/StreamingAggregationSuite.scala  | 94 +++++++++++-----------
 1 file changed, 45 insertions(+), 49 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 2a9e6b8..81b22be 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -342,55 +342,51 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions {
   }
 
   testWithAllStateVersions("prune results by current_date, complete mode") {
-    withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
-      import testImplicits._
-      val clock = new StreamManualClock
-      val tz = TimeZone.getDefault.getID
-      val inputData = MemoryStream[Long]
-      val aggregated =
-        inputData.toDF()
-          .select(to_utc_timestamp(from_unixtime('value * DateTimeUtils.SECONDS_PER_DAY), tz))
-          .toDF("value")
-          .groupBy($"value")
-          .agg(count("*"))
-          .where($"value".cast("date") >= date_sub(current_date(), 10))
-          .select(
-            ($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)")
-      testStream(aggregated, Complete)(
-        StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
-        // advance clock to 10 days, should retain all keys
-        AddData(inputData, 0L, 5L, 5L, 10L),
-        AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
-        CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
-        // advance clock to 20 days, should retain keys >= 10
-        AddData(inputData, 15L, 15L, 20L),
-        AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
-        CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
-        // advance clock to 30 days, should retain keys >= 20
-        AddData(inputData, 85L),
-        AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
-        CheckLastBatch((20L, 1), (85L, 1)),
-
-        // bounce stream and ensure correct batch timestamp is used
-        // i.e., we don't take it from the clock, which is at 90 days.
-        StopStream,
-        AssertOnQuery { q => // clear the sink
-          q.sink.asInstanceOf[MemorySink].clear()
-          q.commitLog.purge(3)
-          // advance by 60 days i.e., 90 days total
-          clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60)
-          true
-        },
-        StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
-        // Commit log blown, causing a re-run of the last batch
-        CheckLastBatch((20L, 1), (85L, 1)),
-
-        // advance clock to 100 days, should retain keys >= 90
-        AddData(inputData, 85L, 90L, 100L, 105L),
-        AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
-        CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
-      )
-    }
+    import testImplicits._
+    val clock = new StreamManualClock
+    val inputData = MemoryStream[Long]
+    val aggregated =
+      inputData.toDF()
+        .select(($"value" * DateTimeUtils.SECONDS_PER_DAY).cast("timestamp").as("value"))
+        .groupBy($"value")
+        .agg(count("*"))
+        .where($"value".cast("date") >= date_sub(current_timestamp().cast("date"), 10))
+        .select(
+          ($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)")
+    testStream(aggregated, Complete)(
+      StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
+      // advance clock to 10 days, should retain all keys
+      AddData(inputData, 0L, 5L, 5L, 10L),
+      AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
+      CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
+      // advance clock to 20 days, should retain keys >= 10
+      AddData(inputData, 15L, 15L, 20L),
+      AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
+      CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
+      // advance clock to 30 days, should retain keys >= 20
+      AddData(inputData, 85L),
+      AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
+      CheckLastBatch((20L, 1), (85L, 1)),
+
+      // bounce stream and ensure correct batch timestamp is used
+      // i.e., we don't take it from the clock, which is at 90 days.
+      StopStream,
+      AssertOnQuery { q => // clear the sink
+        q.sink.asInstanceOf[MemorySink].clear()
+        q.commitLog.purge(3)
+        // advance by 60 days i.e., 90 days total
+        clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60)
+        true
+      },
+      StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
+      // Commit log blown, causing a re-run of the last batch
+      CheckLastBatch((20L, 1), (85L, 1)),
+
+      // advance clock to 100 days, should retain keys >= 90
+      AddData(inputData, 85L, 90L, 100L, 105L),
+      AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
+      CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
+    )
   }
 
   testWithAllStateVersions("SPARK-19690: do not convert batch aggregation in streaming query " +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org