You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/04/09 02:14:02 UTC
[spark] branch master updated: [SPARK-25496][SQL][FOLLOWUP] avoid
using to_utc_timestamp
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 051336d [SPARK-25496][SQL][FOLLOWUP] avoid using to_utc_timestamp
051336d is described below
commit 051336d9dd1000df82cae5fa3c1aa6b4559d9fb7
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Apr 9 10:13:38 2019 +0800
[SPARK-25496][SQL][FOLLOWUP] avoid using to_utc_timestamp
## What changes were proposed in this pull request?
in https://github.com/apache/spark/pull/24195 , we deprecate `from/to_utc_timestamp`.
This PR removes unnecessary use of `to_utc_timestamp` in the test.
## How was this patch tested?
test only PR
Closes #24319 from cloud-fan/minor.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/streaming/StreamingAggregationSuite.scala | 94 +++++++++++-----------
1 file changed, 45 insertions(+), 49 deletions(-)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 2a9e6b8..81b22be 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -342,55 +342,51 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions {
}
testWithAllStateVersions("prune results by current_date, complete mode") {
- withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
- import testImplicits._
- val clock = new StreamManualClock
- val tz = TimeZone.getDefault.getID
- val inputData = MemoryStream[Long]
- val aggregated =
- inputData.toDF()
- .select(to_utc_timestamp(from_unixtime('value * DateTimeUtils.SECONDS_PER_DAY), tz))
- .toDF("value")
- .groupBy($"value")
- .agg(count("*"))
- .where($"value".cast("date") >= date_sub(current_date(), 10))
- .select(
- ($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)")
- testStream(aggregated, Complete)(
- StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
- // advance clock to 10 days, should retain all keys
- AddData(inputData, 0L, 5L, 5L, 10L),
- AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
- CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
- // advance clock to 20 days, should retain keys >= 10
- AddData(inputData, 15L, 15L, 20L),
- AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
- CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
- // advance clock to 30 days, should retain keys >= 20
- AddData(inputData, 85L),
- AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
- CheckLastBatch((20L, 1), (85L, 1)),
-
- // bounce stream and ensure correct batch timestamp is used
- // i.e., we don't take it from the clock, which is at 90 days.
- StopStream,
- AssertOnQuery { q => // clear the sink
- q.sink.asInstanceOf[MemorySink].clear()
- q.commitLog.purge(3)
- // advance by 60 days i.e., 90 days total
- clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60)
- true
- },
- StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
- // Commit log blown, causing a re-run of the last batch
- CheckLastBatch((20L, 1), (85L, 1)),
-
- // advance clock to 100 days, should retain keys >= 90
- AddData(inputData, 85L, 90L, 100L, 105L),
- AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
- CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
- )
- }
+ import testImplicits._
+ val clock = new StreamManualClock
+ val inputData = MemoryStream[Long]
+ val aggregated =
+ inputData.toDF()
+ .select(($"value" * DateTimeUtils.SECONDS_PER_DAY).cast("timestamp").as("value"))
+ .groupBy($"value")
+ .agg(count("*"))
+ .where($"value".cast("date") >= date_sub(current_timestamp().cast("date"), 10))
+ .select(
+ ($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)")
+ testStream(aggregated, Complete)(
+ StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
+ // advance clock to 10 days, should retain all keys
+ AddData(inputData, 0L, 5L, 5L, 10L),
+ AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
+ CheckLastBatch((0L, 1), (5L, 2), (10L, 1)),
+ // advance clock to 20 days, should retain keys >= 10
+ AddData(inputData, 15L, 15L, 20L),
+ AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
+ CheckLastBatch((10L, 1), (15L, 2), (20L, 1)),
+ // advance clock to 30 days, should retain keys >= 20
+ AddData(inputData, 85L),
+ AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
+ CheckLastBatch((20L, 1), (85L, 1)),
+
+ // bounce stream and ensure correct batch timestamp is used
+ // i.e., we don't take it from the clock, which is at 90 days.
+ StopStream,
+ AssertOnQuery { q => // clear the sink
+ q.sink.asInstanceOf[MemorySink].clear()
+ q.commitLog.purge(3)
+ // advance by 60 days i.e., 90 days total
+ clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60)
+ true
+ },
+ StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
+ // Commit log blown, causing a re-run of the last batch
+ CheckLastBatch((20L, 1), (85L, 1)),
+
+ // advance clock to 100 days, should retain keys >= 90
+ AddData(inputData, 85L, 90L, 100L, 105L),
+ AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
+ CheckLastBatch((90L, 1), (100L, 1), (105L, 1))
+ )
}
testWithAllStateVersions("SPARK-19690: do not convert batch aggregation in streaming query " +
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org