You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/04/08 14:12:35 UTC

[GitHub] [spark] cloud-fan commented on a change in pull request #24319: [SPARK-25496][SQL][followup] avoid using to_utc_timestamp

cloud-fan commented on a change in pull request #24319: [SPARK-25496][SQL][followup] avoid using to_utc_timestamp
URL: https://github.com/apache/spark/pull/24319#discussion_r273067529
 
 

 ##########
 File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 ##########
 @@ -342,55 +342,52 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions {
   }
 
   testWithAllStateVersions("prune results by current_date, complete mode") {
-    withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
-      import testImplicits._
-      val clock = new StreamManualClock
-      val tz = TimeZone.getDefault.getID
-      val inputData = MemoryStream[Long]
-      val aggregated =
-        inputData.toDF()
-          .select(to_utc_timestamp(from_unixtime('value * DateTimeUtils.SECONDS_PER_DAY), tz))
-          .toDF("value")
-          .groupBy($"value")
-          .agg(count("*"))
-          .where($"value".cast("date") >= date_sub(current_date(), 10))
-          .select(
-            ($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)")
-      testStream(aggregated, Complete)(
-        StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
-        // advance clock to 10 days, should retain all keys
-        AddData(inputData, 0L, 5L, 5L, 10L),
-        AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
-        CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
-        // advance clock to 20 days, should retain keys >= 10
-        AddData(inputData, 15L, 15L, 20L),
-        AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
-        CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
-        // advance clock to 30 days, should retain keys >= 20
-        AddData(inputData, 85L),
-        AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
-        CheckLastBatch((20L, 1), (85L, 1)),
-
-        // bounce stream and ensure correct batch timestamp is used
-        // i.e., we don't take it from the clock, which is at 90 days.
-        StopStream,
-        AssertOnQuery { q => // clear the sink
-          q.sink.asInstanceOf[MemorySink].clear()
-          q.commitLog.purge(3)
-          // advance by 60 days i.e., 90 days total
-          clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60)
-          true
-        },
-        StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
-        // Commit log blown, causing a re-run of the last batch
-        CheckLastBatch((20L, 1), (85L, 1)),
-
-        // advance clock to 100 days, should retain keys >= 90
-        AddData(inputData, 85L, 90L, 100L, 105L),
-        AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
-        CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
-      )
-    }
+    import testImplicits._
+    val clock = new StreamManualClock
+    val tz = TimeZone.getDefault.getID
+    val inputData = MemoryStream[Long]
+    val aggregated =
+      inputData.toDF()
+        .select(($"value" * DateTimeUtils.SECONDS_PER_DAY).cast("timestamp").as("value"))
 
 Review comment:
   there is one more: I replaced `current_date` with `current_timestamp().cast("date")`.
   
   `current_date` returns date in UTC. We can't compare it with the value of casting timestamp column to date. There is a timezone shift when casting between date and timestamp.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org