You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/07/19 18:02:10 UTC
spark git commit: [SPARK-21464][SS] Minimize deprecation warnings
caused by ProcessingTime class
Repository: spark
Updated Branches:
refs/heads/master 6b6dd682e -> 70fe99dc6
[SPARK-21464][SS] Minimize deprecation warnings caused by ProcessingTime class
## What changes were proposed in this pull request?
Use of `ProcessingTime` class was deprecated in favor of `Trigger.ProcessingTime` in Spark 2.2. However interval uses to ProcessingTime causes deprecation warnings during compilation. This cannot be avoided entirely as even though it is deprecated as a public API, ProcessingTime instances are used internally in TriggerExecutor. This PR is to minimize the warning by removing its uses from tests as much as possible.
## How was this patch tested?
Existing tests.
Author: Tathagata Das <ta...@gmail.com>
Closes #18678 from tdas/SPARK-21464.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/70fe99dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/70fe99dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/70fe99dc
Branch: refs/heads/master
Commit: 70fe99dc62ef636a99bcb8a580ad4de4dca95181
Parents: 6b6dd68
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Jul 19 11:02:07 2017 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Jul 19 11:02:07 2017 -0700
----------------------------------------------------------------------
.../apache/spark/sql/ProcessingTimeSuite.scala | 24 +++++++++++---------
.../streaming/FlatMapGroupsWithStateSuite.scala | 6 ++---
.../streaming/StreamingAggregationSuite.scala | 8 +++----
3 files changed, 20 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/70fe99dc/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala
index 52c2007..623a1b6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala
@@ -22,20 +22,22 @@ import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.streaming.ProcessingTime
+import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
class ProcessingTimeSuite extends SparkFunSuite {
test("create") {
- assert(ProcessingTime(10.seconds).intervalMs === 10 * 1000)
- assert(ProcessingTime.create(10, TimeUnit.SECONDS).intervalMs === 10 * 1000)
- assert(ProcessingTime("1 minute").intervalMs === 60 * 1000)
- assert(ProcessingTime("interval 1 minute").intervalMs === 60 * 1000)
-
- intercept[IllegalArgumentException] { ProcessingTime(null: String) }
- intercept[IllegalArgumentException] { ProcessingTime("") }
- intercept[IllegalArgumentException] { ProcessingTime("invalid") }
- intercept[IllegalArgumentException] { ProcessingTime("1 month") }
- intercept[IllegalArgumentException] { ProcessingTime("1 year") }
+ def getIntervalMs(trigger: Trigger): Long = trigger.asInstanceOf[ProcessingTime].intervalMs
+
+ assert(getIntervalMs(Trigger.ProcessingTime(10.seconds)) === 10 * 1000)
+ assert(getIntervalMs(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) === 10 * 1000)
+ assert(getIntervalMs(Trigger.ProcessingTime("1 minute")) === 60 * 1000)
+ assert(getIntervalMs(Trigger.ProcessingTime("interval 1 minute")) === 60 * 1000)
+
+ intercept[IllegalArgumentException] { Trigger.ProcessingTime(null: String) }
+ intercept[IllegalArgumentException] { Trigger.ProcessingTime("") }
+ intercept[IllegalArgumentException] { Trigger.ProcessingTime("invalid") }
+ intercept[IllegalArgumentException] { Trigger.ProcessingTime("1 month") }
+ intercept[IllegalArgumentException] { Trigger.ProcessingTime("1 year") }
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/70fe99dc/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index 9f2f0d1..a5399cd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -664,7 +664,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
.flatMapGroupsWithState(Update, ProcessingTimeTimeout)(stateFunc)
testStream(result, Update)(
- StartStream(ProcessingTime("1 second"), triggerClock = clock),
+ StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
AddData(inputData, "a"),
AdvanceManualClock(1 * 1000),
CheckLastBatch(("a", "1")),
@@ -729,7 +729,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
.flatMapGroupsWithState(Update, EventTimeTimeout)(stateFunc)
testStream(result, Update)(
- StartStream(ProcessingTime("1 second")),
+ StartStream(Trigger.ProcessingTime("1 second")),
AddData(inputData, ("a", 11), ("a", 13), ("a", 15)), // Set timeout timestamp of ...
CheckLastBatch(("a", 15)), // "a" to 15 + 5 = 20s, watermark to 5s
AddData(inputData, ("a", 4)), // Add data older than watermark for "a"
@@ -901,7 +901,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
.flatMapGroupsWithState(Update, ProcessingTimeTimeout)(stateFunc)
testStream(result, Update)(
- StartStream(ProcessingTime("1 second"), triggerClock = clock),
+ StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
AddData(inputData, ("a", 1L)),
AdvanceManualClock(1 * 1000),
CheckLastBatch(("a", "1"))
http://git-wip-us.apache.org/repos/asf/spark/blob/70fe99dc/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 4345a70..b6e82b6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -267,7 +267,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte
.where('value >= current_timestamp().cast("long") - 10L)
testStream(aggregated, Complete)(
- StartStream(ProcessingTime("10 seconds"), triggerClock = clock),
+ StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock),
// advance clock to 10 seconds, all keys retained
AddData(inputData, 0L, 5L, 5L, 10L),
@@ -294,7 +294,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte
clock.advance(60 * 1000L)
true
},
- StartStream(ProcessingTime("10 seconds"), triggerClock = clock),
+ StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock),
// The commit log blown, causing the last batch to re-run
CheckLastBatch((20L, 1), (85L, 1)),
AssertOnQuery { q =>
@@ -322,7 +322,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte
.where($"value".cast("date") >= date_sub(current_date(), 10))
.select(($"value".cast("long") / DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)")
testStream(aggregated, Complete)(
- StartStream(ProcessingTime("10 day"), triggerClock = clock),
+ StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
// advance clock to 10 days, should retain all keys
AddData(inputData, 0L, 5L, 5L, 10L),
AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
@@ -346,7 +346,7 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte
clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60)
true
},
- StartStream(ProcessingTime("10 day"), triggerClock = clock),
+ StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
// Commit log blown, causing a re-run of the last batch
CheckLastBatch((20L, 1), (85L, 1)),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org