You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/03 22:30:49 UTC

spark git commit: [SPARK-18257][SS] Improve error reporting for FileStressSuite

Repository: spark
Updated Branches:
  refs/heads/master e89202523 -> f22954ad4


[SPARK-18257][SS] Improve error reporting for FileStressSuite

## What changes were proposed in this pull request?
This patch improves error reporting for FileStressSuite, when there is an error in Spark itself (not user code). This works by simply tightening the exception verification, and gets rid of the unnecessary thread for starting the stream.

Also renamed the class FileStreamStressSuite to make it more obvious it is a streaming suite.

## How was this patch tested?
This is a test only change and I manually verified error reporting by injecting some bug in the addBatch code for FileStreamSink.

Author: Reynold Xin <rx...@databricks.com>

Closes #15757 from rxin/SPARK-18257.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f22954ad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f22954ad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f22954ad

Branch: refs/heads/master
Commit: f22954ad49bf5a32c7b6d8487cd38ffe0da904ca
Parents: e892025
Author: Reynold Xin <rx...@databricks.com>
Authored: Thu Nov 3 15:30:45 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Nov 3 15:30:45 2016 -0700

----------------------------------------------------------------------
 .../sql/streaming/FileStreamStressSuite.scala   | 156 +++++++++++++++++++
 .../spark/sql/streaming/FileStressSuite.scala   | 153 ------------------
 2 files changed, 156 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f22954ad/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamStressSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamStressSuite.scala
new file mode 100644
index 0000000..28412ea
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamStressSuite.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+import java.util.UUID
+
+import scala.util.Random
+import scala.util.control.NonFatal
+
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.util.Utils
+
+/**
+ * A stress test for streaming queries that read and write files.  This test consists of
+ * two threads:
+ *  - one that writes out `numRecords` distinct integers to files of random sizes (the total
+ *    number of records is fixed but each files size / creation time is random).
+ *  - another that continually restarts a buggy streaming query (i.e. fails with 5% probability on
+ *    any partition).
+ *
+ * At the end, the resulting files are loaded and the answer is checked.
+ */
+class FileStreamStressSuite extends StreamTest {
+  import testImplicits._
+
+  // Error message thrown in the streaming job for testing recovery.
+  private val injectedErrorMsg = "test suite injected failure!"
+
+  testQuietly("fault tolerance stress test - unpartitioned output") {
+    stressTest(partitionWrites = false)
+  }
+
+  testQuietly("fault tolerance stress test - partitioned output") {
+    stressTest(partitionWrites = true)
+  }
+
+  def stressTest(partitionWrites: Boolean): Unit = {
+    val numRecords = 10000
+    val inputDir = Utils.createTempDir(namePrefix = "stream.input").getCanonicalPath
+    val stagingDir = Utils.createTempDir(namePrefix = "stream.staging").getCanonicalPath
+    val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
+    val checkpoint = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
+
+    @volatile
+    var continue = true
+    @volatile
+    var stream: StreamingQuery = null
+
+    val writer = new Thread("stream writer") {
+      override def run(): Unit = {
+        var i = numRecords
+        while (i > 0) {
+          val count = Random.nextInt(100)
+          var j = 0
+          var string = ""
+          while (j < count && i > 0) {
+            if (i % 10000 == 0) { logError(s"Wrote record $i") }
+            string = string + i + "\n"
+            j += 1
+            i -= 1
+          }
+
+          val uuid = UUID.randomUUID().toString
+          val fileName = new File(stagingDir, uuid)
+          stringToFile(fileName, string)
+          fileName.renameTo(new File(inputDir, uuid))
+          val sleep = Random.nextInt(100)
+          Thread.sleep(sleep)
+        }
+
+        logError("== DONE WRITING ==")
+        var done = false
+        while (!done) {
+          try {
+            stream.processAllAvailable()
+            done = true
+          } catch {
+            case NonFatal(_) =>
+          }
+        }
+
+        continue = false
+        stream.stop()
+      }
+    }
+    writer.start()
+
+    val input = spark.readStream.format("text").load(inputDir)
+
+    def startStream(): StreamingQuery = {
+      val errorMsg = injectedErrorMsg  // work around serialization issue
+      val output = input
+        .repartition(5)
+        .as[String]
+        .mapPartitions { iter =>
+          val rand = Random.nextInt(100)
+          if (rand < 10) {
+            sys.error(errorMsg)
+          }
+          iter.map(_.toLong)
+        }
+        .map(x => (x % 400, x.toString))
+        .toDF("id", "data")
+
+      if (partitionWrites) {
+        output
+          .writeStream
+          .partitionBy("id")
+          .format("parquet")
+          .option("checkpointLocation", checkpoint)
+          .start(outputDir)
+      } else {
+        output
+          .writeStream
+          .format("parquet")
+          .option("checkpointLocation", checkpoint)
+          .start(outputDir)
+      }
+    }
+
+    var failures = 0
+    while (continue) {
+      if (failures % 10 == 0) { logError(s"Query restart #$failures") }
+      stream = startStream()
+
+      try {
+        stream.awaitTermination()
+      } catch {
+        case e: StreamingQueryException
+          if e.getCause != null && e.getCause.getCause != null &&
+              e.getCause.getCause.getMessage.contains(injectedErrorMsg) =>
+          // Getting the expected error message
+          failures += 1
+      }
+    }
+
+    logError(s"Stream restarted $failures times.")
+    assert(spark.read.parquet(outputDir).distinct().count() == numRecords)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f22954ad/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
deleted file mode 100644
index f9e236c..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import java.io.File
-import java.util.UUID
-
-import scala.util.Random
-import scala.util.control.NonFatal
-
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.util.Utils
-
-/**
- * A stress test for streaming queries that read and write files.  This test consists of
- * two threads:
- *  - one that writes out `numRecords` distinct integers to files of random sizes (the total
- *    number of records is fixed but each files size / creation time is random).
- *  - another that continually restarts a buggy streaming query (i.e. fails with 5% probability on
- *    any partition).
- *
- * At the end, the resulting files are loaded and the answer is checked.
- */
-class FileStressSuite extends StreamTest {
-  import testImplicits._
-
-  testQuietly("fault tolerance stress test - unpartitioned output") {
-    stressTest(partitionWrites = false)
-  }
-
-  testQuietly("fault tolerance stress test - partitioned output") {
-    stressTest(partitionWrites = true)
-  }
-
-  def stressTest(partitionWrites: Boolean): Unit = {
-    val numRecords = 10000
-    val inputDir = Utils.createTempDir(namePrefix = "stream.input").getCanonicalPath
-    val stagingDir = Utils.createTempDir(namePrefix = "stream.staging").getCanonicalPath
-    val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
-    val checkpoint = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
-
-    @volatile
-    var continue = true
-    @volatile
-    var stream: StreamingQuery = null
-
-    val writer = new Thread("stream writer") {
-      override def run(): Unit = {
-        var i = numRecords
-        while (i > 0) {
-          val count = Random.nextInt(100)
-          var j = 0
-          var string = ""
-          while (j < count && i > 0) {
-            if (i % 10000 == 0) { logError(s"Wrote record $i") }
-            string = string + i + "\n"
-            j += 1
-            i -= 1
-          }
-
-          val uuid = UUID.randomUUID().toString
-          val fileName = new File(stagingDir, uuid)
-          stringToFile(fileName, string)
-          fileName.renameTo(new File(inputDir, uuid))
-          val sleep = Random.nextInt(100)
-          Thread.sleep(sleep)
-        }
-
-        logError("== DONE WRITING ==")
-        var done = false
-        while (!done) {
-          try {
-            stream.processAllAvailable()
-            done = true
-          } catch {
-            case NonFatal(_) =>
-          }
-        }
-
-        continue = false
-        stream.stop()
-      }
-    }
-    writer.start()
-
-    val input = spark.readStream.format("text").load(inputDir)
-
-    def startStream(): StreamingQuery = {
-      val output = input
-        .repartition(5)
-        .as[String]
-        .mapPartitions { iter =>
-          val rand = Random.nextInt(100)
-          if (rand < 10) {
-            sys.error("failure")
-          }
-          iter.map(_.toLong)
-        }
-        .map(x => (x % 400, x.toString))
-        .toDF("id", "data")
-
-      if (partitionWrites) {
-        output
-          .writeStream
-          .partitionBy("id")
-          .format("parquet")
-          .option("checkpointLocation", checkpoint)
-          .start(outputDir)
-      } else {
-        output
-          .writeStream
-          .format("parquet")
-          .option("checkpointLocation", checkpoint)
-          .start(outputDir)
-      }
-    }
-
-    var failures = 0
-    val streamThread = new Thread("stream runner") {
-      while (continue) {
-        if (failures % 10 == 0) { logError(s"Query restart #$failures") }
-        stream = startStream()
-
-        try {
-          stream.awaitTermination()
-        } catch {
-          case ce: StreamingQueryException =>
-            failures += 1
-        }
-      }
-    }
-
-    streamThread.join()
-
-    logError(s"Stream restarted $failures times.")
-    assert(spark.read.parquet(outputDir).distinct().count() == numRecords)
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org