You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by js...@apache.org on 2018/04/28 01:56:03 UTC

spark git commit: [SPARK-23688][SS] Refactor tests away from rate source

Repository: spark
Updated Branches:
  refs/heads/master 8614edd44 -> 1fb46f30f


[SPARK-23688][SS] Refactor tests away from rate source

## What changes were proposed in this pull request?

Replace rate source with memory source in continuous mode test suite. Keep using "rate" source if the tests intend to put data periodically in background, or need to put short source name to load, since "memory" doesn't have provider for source.

## How was this patch tested?

Ran relevant test suite from IDE.

Author: Jungtaek Lim <ka...@gmail.com>

Closes #21152 from HeartSaVioR/SPARK-23688.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1fb46f30
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1fb46f30
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1fb46f30

Branch: refs/heads/master
Commit: 1fb46f30f83e4751169ff288ad406f26b7c11f7e
Parents: 8614edd
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sat Apr 28 09:55:56 2018 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Sat Apr 28 09:55:56 2018 +0800

----------------------------------------------------------------------
 .../streaming/continuous/ContinuousSuite.scala  | 163 +++++++------------
 1 file changed, 61 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1fb46f30/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index c318b95..5f222e7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -75,73 +75,50 @@ class ContinuousSuite extends ContinuousSuiteBase {
   }
 
   test("map") {
-    val df = spark.readStream
-      .format("rate")
-      .option("numPartitions", "5")
-      .option("rowsPerSecond", "5")
-      .load()
-      .select('value)
-      .map(r => r.getLong(0) * 2)
+    val input = ContinuousMemoryStream[Int]
+    val df = input.toDF().map(_.getInt(0) * 2)
 
-    testStream(df, useV2Sink = true)(
-      StartStream(longContinuousTrigger),
-      AwaitEpoch(0),
-      Execute(waitForRateSourceTriggers(_, 2)),
-      IncrementEpoch(),
-      Execute(waitForRateSourceTriggers(_, 4)),
-      IncrementEpoch(),
-      CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_))))
+    testStream(df)(
+      AddData(input, 0, 1),
+      CheckAnswer(0, 2),
+      StopStream,
+      AddData(input, 2, 3, 4),
+      StartStream(),
+      CheckAnswer(0, 2, 4, 6, 8))
   }
 
   test("flatMap") {
-    val df = spark.readStream
-      .format("rate")
-      .option("numPartitions", "5")
-      .option("rowsPerSecond", "5")
-      .load()
-      .select('value)
-      .flatMap(r => Seq(0, r.getLong(0), r.getLong(0) * 2))
+    val input = ContinuousMemoryStream[Int]
+    val df = input.toDF().flatMap(r => Seq(0, r.getInt(0), r.getInt(0) * 2))
 
-    testStream(df, useV2Sink = true)(
-      StartStream(longContinuousTrigger),
-      AwaitEpoch(0),
-      Execute(waitForRateSourceTriggers(_, 2)),
-      IncrementEpoch(),
-      Execute(waitForRateSourceTriggers(_, 4)),
-      IncrementEpoch(),
-      CheckAnswerRowsContains(scala.Range(0, 20).flatMap(n => Seq(0, n, n * 2)).map(Row(_))))
+    testStream(df)(
+      AddData(input, 0, 1),
+      CheckAnswer((0 to 1).flatMap(n => Seq(0, n, n * 2)): _*),
+      StopStream,
+      AddData(input, 2, 3, 4),
+      StartStream(),
+      CheckAnswer((0 to 4).flatMap(n => Seq(0, n, n * 2)): _*))
   }
 
   test("filter") {
-    val df = spark.readStream
-      .format("rate")
-      .option("numPartitions", "5")
-      .option("rowsPerSecond", "5")
-      .load()
-      .select('value)
-      .where('value > 5)
+    val input = ContinuousMemoryStream[Int]
+    val df = input.toDF().where('value > 2)
 
-    testStream(df, useV2Sink = true)(
-      StartStream(longContinuousTrigger),
-      AwaitEpoch(0),
-      Execute(waitForRateSourceTriggers(_, 2)),
-      IncrementEpoch(),
-      Execute(waitForRateSourceTriggers(_, 4)),
-      IncrementEpoch(),
-      CheckAnswerRowsContains(scala.Range(6, 20).map(Row(_))))
+    testStream(df)(
+      AddData(input, 0, 1),
+      CheckAnswer(),
+      StopStream,
+      AddData(input, 2, 3, 4),
+      StartStream(),
+      CheckAnswer(3, 4))
   }
 
   test("deduplicate") {
-    val df = spark.readStream
-      .format("rate")
-      .option("numPartitions", "5")
-      .option("rowsPerSecond", "5")
-      .load()
-      .select('value)
-      .dropDuplicates()
+    val input = ContinuousMemoryStream[Int]
+    val df = input.toDF().dropDuplicates()
 
     val except = intercept[AnalysisException] {
-      testStream(df, useV2Sink = true)(StartStream(longContinuousTrigger))
+      testStream(df)(StartStream())
     }
 
     assert(except.message.contains(
@@ -149,15 +126,11 @@ class ContinuousSuite extends ContinuousSuiteBase {
   }
 
   test("timestamp") {
-    val df = spark.readStream
-      .format("rate")
-      .option("numPartitions", "5")
-      .option("rowsPerSecond", "5")
-      .load()
-      .select(current_timestamp())
+    val input = ContinuousMemoryStream[Int]
+    val df = input.toDF().select(current_timestamp())
 
     val except = intercept[AnalysisException] {
-      testStream(df, useV2Sink = true)(StartStream(longContinuousTrigger))
+      testStream(df)(StartStream())
     }
 
     assert(except.message.contains(
@@ -165,58 +138,43 @@ class ContinuousSuite extends ContinuousSuiteBase {
   }
 
   test("subquery alias") {
-    val df = spark.readStream
-      .format("rate")
-      .option("numPartitions", "5")
-      .option("rowsPerSecond", "5")
-      .load()
-      .createOrReplaceTempView("rate")
-    val test = spark.sql("select value from rate where value > 5")
+    val input = ContinuousMemoryStream[Int]
+    input.toDF().createOrReplaceTempView("memory")
+    val test = spark.sql("select value from memory where value > 2")
 
-    testStream(test, useV2Sink = true)(
-      StartStream(longContinuousTrigger),
-      AwaitEpoch(0),
-      Execute(waitForRateSourceTriggers(_, 2)),
-      IncrementEpoch(),
-      Execute(waitForRateSourceTriggers(_, 4)),
-      IncrementEpoch(),
-      CheckAnswerRowsContains(scala.Range(6, 20).map(Row(_))))
+    testStream(test)(
+      AddData(input, 0, 1),
+      CheckAnswer(),
+      StopStream,
+      AddData(input, 2, 3, 4),
+      StartStream(),
+      CheckAnswer(3, 4))
   }
 
   test("repeatedly restart") {
-    val df = spark.readStream
-      .format("rate")
-      .option("numPartitions", "5")
-      .option("rowsPerSecond", "5")
-      .load()
-      .select('value)
+    val input = ContinuousMemoryStream[Int]
+    val df = input.toDF()
 
-    testStream(df, useV2Sink = true)(
-      StartStream(longContinuousTrigger),
-      AwaitEpoch(0),
-      Execute(waitForRateSourceTriggers(_, 2)),
-      IncrementEpoch(),
-      CheckAnswerRowsContains(scala.Range(0, 10).map(Row(_))),
+    testStream(df)(
+      StartStream(),
+      AddData(input, 0, 1),
+      CheckAnswer(0, 1),
       StopStream,
-      StartStream(longContinuousTrigger),
+      StartStream(),
       StopStream,
-      StartStream(longContinuousTrigger),
+      StartStream(),
       StopStream,
-      StartStream(longContinuousTrigger),
-      AwaitEpoch(2),
-      Execute(waitForRateSourceTriggers(_, 2)),
-      IncrementEpoch(),
-      CheckAnswerRowsContains(scala.Range(0, 20).map(Row(_))),
+      StartStream(),
+      StopStream,
+      AddData(input, 2, 3),
+      StartStream(),
+      CheckAnswer(0, 1, 2, 3),
       StopStream)
   }
 
   test("task failure kills the query") {
-    val df = spark.readStream
-      .format("rate")
-      .option("numPartitions", "5")
-      .option("rowsPerSecond", "5")
-      .load()
-      .select('value)
+    val input = ContinuousMemoryStream[Int]
+    val df = input.toDF()
 
     // Get an arbitrary task from this query to kill. It doesn't matter which one.
     var taskId: Long = -1
@@ -227,9 +185,9 @@ class ContinuousSuite extends ContinuousSuiteBase {
     }
     spark.sparkContext.addSparkListener(listener)
     try {
-      testStream(df, useV2Sink = true)(
+      testStream(df)(
         StartStream(Trigger.Continuous(100)),
-        Execute(waitForRateSourceTriggers(_, 2)),
+        AddData(input, 0, 1, 2, 3),
         Execute { _ =>
           // Wait until a task is started, then kill its first attempt.
           eventually(timeout(streamingTimeout)) {
@@ -252,6 +210,7 @@ class ContinuousSuite extends ContinuousSuiteBase {
       .option("rowsPerSecond", "2")
       .load()
       .select('value)
+
     val query = df.writeStream
       .format("memory")
       .queryName("noharness")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org