You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/01/28 21:53:25 UTC

spark git commit: [SPARK-5437] Fix DriverSuite and SparkSubmitSuite timeout issues

Repository: spark
Updated Branches:
  refs/heads/master 81f8f3406 -> 84b6ecdef


[SPARK-5437] Fix DriverSuite and SparkSubmitSuite timeout issues

In DriverSuite, we currently set a timeout of 60 seconds. If after this time the process has not terminated, we leak the process because we never destroy it.

In SparkSubmitSuite, we currently do not have a timeout so the test can hang indefinitely.

Author: Andrew Or <an...@databricks.com>

Closes #4230 from andrewor14/fix-driver-suite and squashes the following commits:

f5c80fd [Andrew Or] Fix timeout behaviors in both suites
8092c36 [Andrew Or] Stop SparkContext after every individual test


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84b6ecde
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84b6ecde
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84b6ecde

Branch: refs/heads/master
Commit: 84b6ecdef63e6f5710a3f7f95f698b1d1ea44855
Parents: 81f8f34
Author: Andrew Or <an...@databricks.com>
Authored: Wed Jan 28 12:52:31 2015 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Wed Jan 28 12:53:22 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/Utils.scala     | 87 ++++++++++----------
 .../scala/org/apache/spark/DriverSuite.scala    | 26 +++---
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 26 +++---
 3 files changed, 71 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/84b6ecde/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 2c04e4d..86ac307 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -410,10 +410,10 @@ private[spark] object Utils extends Logging {
     // Decompress the file if it's a .tar or .tar.gz
     if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
       logInfo("Untarring " + fileName)
-      Utils.execute(Seq("tar", "-xzf", fileName), targetDir)
+      executeAndGetOutput(Seq("tar", "-xzf", fileName), targetDir)
     } else if (fileName.endsWith(".tar")) {
       logInfo("Untarring " + fileName)
-      Utils.execute(Seq("tar", "-xf", fileName), targetDir)
+      executeAndGetOutput(Seq("tar", "-xf", fileName), targetDir)
     }
     // Make the file executable - That's necessary for scripts
     FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
@@ -956,25 +956,25 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Execute a command in the given working directory, throwing an exception if it completes
-   * with an exit code other than 0.
+   * Execute a command and return the process running the command.
    */
-  def execute(command: Seq[String], workingDir: File) {
-    val process = new ProcessBuilder(command: _*)
-        .directory(workingDir)
-        .redirectErrorStream(true)
-        .start()
-    new Thread("read stdout for " + command(0)) {
-      override def run() {
-        for (line <- Source.fromInputStream(process.getInputStream).getLines()) {
-          System.err.println(line)
-        }
-      }
-    }.start()
-    val exitCode = process.waitFor()
-    if (exitCode != 0) {
-      throw new SparkException("Process " + command + " exited with code " + exitCode)
+  def executeCommand(
+      command: Seq[String],
+      workingDir: File = new File("."),
+      extraEnvironment: Map[String, String] = Map.empty,
+      redirectStderr: Boolean = true): Process = {
+    val builder = new ProcessBuilder(command: _*).directory(workingDir)
+    val environment = builder.environment()
+    for ((key, value) <- extraEnvironment) {
+      environment.put(key, value)
+    }
+    val process = builder.start()
+    if (redirectStderr) {
+      val threadName = "redirect stderr for command " + command(0)
+      def log(s: String): Unit = logInfo(s)
+      processStreamByLine(threadName, process.getErrorStream, log)
     }
+    process
   }
 
   /**
@@ -983,31 +983,13 @@ private[spark] object Utils extends Logging {
   def executeAndGetOutput(
       command: Seq[String],
       workingDir: File = new File("."),
-      extraEnvironment: Map[String, String] = Map.empty): String = {
-    val builder = new ProcessBuilder(command: _*)
-        .directory(workingDir)
-    val environment = builder.environment()
-    for ((key, value) <- extraEnvironment) {
-      environment.put(key, value)
-    }
-
-    val process = builder.start()
-    new Thread("read stderr for " + command(0)) {
-      override def run() {
-        for (line <- Source.fromInputStream(process.getErrorStream).getLines()) {
-          logInfo(line)
-        }
-      }
-    }.start()
+      extraEnvironment: Map[String, String] = Map.empty,
+      redirectStderr: Boolean = true): String = {
+    val process = executeCommand(command, workingDir, extraEnvironment, redirectStderr)
     val output = new StringBuffer
-    val stdoutThread = new Thread("read stdout for " + command(0)) {
-      override def run() {
-        for (line <- Source.fromInputStream(process.getInputStream).getLines()) {
-          output.append(line)
-        }
-      }
-    }
-    stdoutThread.start()
+    val threadName = "read stdout for " + command(0)
+    def appendToOutput(s: String): Unit = output.append(s)
+    val stdoutThread = processStreamByLine(threadName, process.getInputStream, appendToOutput)
     val exitCode = process.waitFor()
     stdoutThread.join()   // Wait for it to finish reading output
     if (exitCode != 0) {
@@ -1018,6 +1000,25 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Return and start a daemon thread that processes the content of the input stream line by line.
+   */
+  def processStreamByLine(
+      threadName: String,
+      inputStream: InputStream,
+      processLine: String => Unit): Thread = {
+    val t = new Thread(threadName) {
+      override def run() {
+        for (line <- Source.fromInputStream(inputStream).getLines()) {
+          processLine(line)
+        }
+      }
+    }
+    t.setDaemon(true)
+    t.start()
+    t
+  }
+
+  /**
    * Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
    * default UncaughtExceptionHandler
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/84b6ecde/core/src/test/scala/org/apache/spark/DriverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index 8a54360..9bd5dfe 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -28,31 +28,29 @@ import org.apache.spark.util.Utils
 
 class DriverSuite extends FunSuite with Timeouts {
 
-  test("driver should exit after finishing") {
+  test("driver should exit after finishing without cleanup (SPARK-530)") {
     val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
-    // Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
-    val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
+    val masters = Table("master", "local", "local-cluster[2,1,512]")
     forAll(masters) { (master: String) =>
-      failAfter(60 seconds) {
-        Utils.executeAndGetOutput(
-          Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
-          new File(sparkHome),
-          Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
-      }
+      val process = Utils.executeCommand(
+        Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
+        new File(sparkHome),
+        Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
+      failAfter(60 seconds) { process.waitFor() }
+      // Ensure we still kill the process in case it timed out
+      process.destroy()
     }
   }
 }
 
 /**
- * Program that creates a Spark driver but doesn't call SparkContext.stop() or
- * Sys.exit() after finishing.
+ * Program that creates a Spark driver but doesn't call SparkContext#stop() or
+ * sys.exit() after finishing.
  */
 object DriverWithoutCleanup {
   def main(args: Array[String]) {
     Utils.configTestLog4j("INFO")
-    // Bind the web UI to an ephemeral port in order to avoid conflicts with other tests running on
-    // the same machine (we shouldn't just disable the UI here, since that might mask bugs):
-    val conf = new SparkConf().set("spark.ui.port", "0")
+    val conf = new SparkConf
     val sc = new SparkContext(args(0), "DriverWithoutCleanup", conf)
     sc.parallelize(1 to 100, 4).count()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/84b6ecde/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 065b753..82628ad 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -21,25 +21,28 @@ import java.io._
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.scalatest.FunSuite
+import org.scalatest.Matchers
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.time.SpanSugar._
+
 import org.apache.spark._
 import org.apache.spark.deploy.SparkSubmit._
 import org.apache.spark.util.{ResetSystemProperties, Utils}
-import org.scalatest.FunSuite
-import org.scalatest.Matchers
 
 // Note: this suite mixes in ResetSystemProperties because SparkSubmit.main() sets a bunch
 // of properties that neeed to be cleared after tests.
-class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties {
+class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties with Timeouts {
   def beforeAll() {
     System.setProperty("spark.testing", "true")
   }
 
-  val noOpOutputStream = new OutputStream {
+  private val noOpOutputStream = new OutputStream {
     def write(b: Int) = {}
   }
 
   /** Simple PrintStream that reads data into a buffer */
-  class BufferPrintStream extends PrintStream(noOpOutputStream) {
+  private class BufferPrintStream extends PrintStream(noOpOutputStream) {
     var lineBuffer = ArrayBuffer[String]()
     override def println(line: String) {
       lineBuffer += line
@@ -47,7 +50,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
   }
 
   /** Returns true if the script exits and the given search string is printed. */
-  def testPrematureExit(input: Array[String], searchString: String) = {
+  private def testPrematureExit(input: Array[String], searchString: String) = {
     val printStream = new BufferPrintStream()
     SparkSubmit.printStream = printStream
 
@@ -290,7 +293,6 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
       "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
       "--name", "testApp",
       "--master", "local",
-      "--conf", "spark.ui.enabled=false",
       unusedJar.toString)
     runSparkSubmit(args)
   }
@@ -305,7 +307,6 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
       "--name", "testApp",
       "--master", "local-cluster[2,1,512]",
       "--jars", jarsString,
-      "--conf", "spark.ui.enabled=false",
       unusedJar.toString)
     runSparkSubmit(args)
   }
@@ -430,15 +431,18 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
   }
 
   // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
-  def runSparkSubmit(args: Seq[String]): String = {
+  private def runSparkSubmit(args: Seq[String]): Unit = {
     val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
-    Utils.executeAndGetOutput(
+    val process = Utils.executeCommand(
       Seq("./bin/spark-submit") ++ args,
       new File(sparkHome),
       Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
+    failAfter(60 seconds) { process.waitFor() }
+    // Ensure we still kill the process in case it timed out
+    process.destroy()
   }
 
-  def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
+  private def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
     val tmpDir = Utils.createTempDir()
 
     val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org