You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lr...@apache.org on 2018/01/03 01:13:25 UTC

incubator-toree git commit: [TOREE-443] Add --spark-context-initialization-timeout cmd-line option

Repository: incubator-toree
Updated Branches:
  refs/heads/master 0aafc3a7b -> 5048616c7


[TOREE-443] Add --spark-context-initialization-timeout cmd-line option

Instead of hardcoding the timeout to 100ms,
--spark-context-initialization-timeout command-line option
has been introduced to make it configurable. On slow/busy nodes,
an appropriate timeout can be specified to avoid the confusing
messages in the notebook UI. Added unit tests to
CommandLineOptionsSpec.scala and KernelSpec.scala.

Closes #147


Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/5048616c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/5048616c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/5048616c

Branch: refs/heads/master
Commit: 5048616c78aed9f829ac96ae1d96f81b49ae5afd
Parents: 0aafc3a
Author: Sanjay Saxena <sa...@gmail.com>
Authored: Wed Nov 15 13:52:30 2017 -0800
Committer: Luciano Resende <lr...@apache.org>
Committed: Tue Jan 2 17:09:38 2018 -0800

----------------------------------------------------------------------
 .../apache/toree/boot/CommandLineOptions.scala  |  10 +-
 .../org/apache/toree/kernel/api/Kernel.scala    |  21 +++-
 .../toree/boot/CommandLineOptionsSpec.scala     | 115 +++++++++++++++----
 .../apache/toree/kernel/api/KernelSpec.scala    |  43 +++++--
 resources/compile/application.conf              |   1 +
 resources/compile/reference.conf                |   3 +
 resources/test/application.conf                 |   1 +
 resources/test/reference.conf                   |   3 +
 8 files changed, 163 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5048616c/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala b/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala
index 1187bba..4fc021f 100644
--- a/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala
+++ b/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala
@@ -19,9 +19,7 @@ package org.apache.toree.boot
 
 import java.io.{File, OutputStream}
 
-import org.apache.toree.utils.KeyValuePairUtils
 import com.typesafe.config.{Config, ConfigFactory}
-import joptsimple.util.KeyValuePair
 import joptsimple.{OptionParser, OptionSpec}
 
 import scala.collection.JavaConverters._
@@ -100,6 +98,13 @@ class CommandLineOptions(args: Seq[String]) {
     "interpreter-plugin"
   ).withRequiredArg().ofType(classOf[String])
 
+  private val _spark_context_intialization_timeout = parser.accepts(
+    "spark-context-intialization-timeout",
+    "The time (in milliseconds) allowed for creation of the spark context. " +
+      "Failure to create a context in this time could result in duplicate initialization messages. " +
+      "The default value is 100 milliseconds."
+  ).withRequiredArg().ofType(classOf[Long])
+
   private val options = parser.parse(args.map(_.trim): _*)
 
   /*
@@ -149,6 +154,7 @@ class CommandLineOptions(args: Seq[String]) {
       "magic_urls" -> getAll(_magic_url).map(_.asJava)
         .flatMap(list => if (list.isEmpty) None else Some(list)),
       "max_interpreter_threads" -> get(_max_interpreter_threads),
+      "spark_context_intialization_timeout" -> get(_spark_context_intialization_timeout),
       "jar_dir" -> get(_jar_dir),
       "default_interpreter" -> get(_default_interpreter),
       "nosparkcontext" -> (if (has(_nosparkcontext)) Some(true) else Some(false)),

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5048616c/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala b/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
index 65a8b98..6900155 100644
--- a/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
+++ b/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
@@ -392,6 +392,17 @@ class Kernel (
     sparkContext
   }
 
+  // TODO: Exposed for testing purposes.
+  protected[toree] def getSparkContextInitializationTimeout: Long = {
+    val timeout:Long = config.getDuration("spark_context_intialization_timeout", TimeUnit.MILLISECONDS)
+    if (timeout <= 0) {
+      val clOptionName = "spark-context-intialization-timeout"
+      throw new RuntimeException(s"--$clOptionName: Invalid timeout of '$timeout' milliseconds specified. " +
+        s"Must specify a positive value.")
+    }
+    timeout
+  }
+
   override def interpreter(name: String): Option[Interpreter] = {
     interpreterManager.interpreters.get(name)
   }
@@ -401,17 +412,19 @@ class Kernel (
   override def sparkSession: SparkSession = {
     defaultSparkConf.getOption("spark.master") match {
       case Some(master) if !master.contains("local") =>
-        // when connecting to a remote cluster, the first call to getOrCreate
+        // When connecting to a remote cluster, the first call to getOrCreate
         // may create a session and take a long time, so this starts a future
-        // to get the session. if it take longer than 100 ms, then print a
-        // message to the user that Spark is starting.
+        // to get the session. If it take longer than specified timeout, then
+        // print a message to the user that Spark is starting. Note, the
+        // default timeout is 100ms and it is specified in reference.conf.
         import scala.concurrent.ExecutionContext.Implicits.global
         val sessionFuture = Future {
           SparkSession.builder.config(defaultSparkConf).getOrCreate
         }
 
         try {
-          Await.result(sessionFuture, Duration(100, TimeUnit.MILLISECONDS))
+          val timeout = getSparkContextInitializationTimeout
+          Await.result(sessionFuture, Duration(timeout, TimeUnit.MILLISECONDS))
         } catch {
           case timeout: TimeoutException =>
             // getting the session is taking a long time, so assume that Spark

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5048616c/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala b/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala
index cc1173c..d79014d 100644
--- a/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala
+++ b/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala
@@ -37,7 +37,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
 
         val actual = options.toConfig.getInt("max_interpreter_threads")
 
-        actual should be (expected)
+        actual should be(expected)
       }
     }
 
@@ -45,7 +45,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
       it("should set the help flag to true") {
         val options = new CommandLineOptions("--help" :: Nil)
 
-        options.help should be (true)
+        options.help should be(true)
       }
     }
 
@@ -53,7 +53,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
       it("should set the help flag to true") {
         val options = new CommandLineOptions("-h" :: Nil)
 
-        options.help should be (true)
+        options.help should be(true)
       }
     }
 
@@ -61,7 +61,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
       it("should set the help flag to false") {
         val options = new CommandLineOptions(Nil)
 
-        options.help should be (false)
+        options.help should be(false)
       }
     }
 
@@ -69,7 +69,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
       it("should set the version flag to true") {
         val options = new CommandLineOptions("--version" :: Nil)
 
-        options.version should be (true)
+        options.version should be(true)
       }
     }
 
@@ -77,7 +77,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
       it("should set the version flag to true") {
         val options = new CommandLineOptions("-v" :: Nil)
 
-        options.version should be (true)
+        options.version should be(true)
       }
     }
 
@@ -85,7 +85,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
       it("should set the version flag to false") {
         val options = new CommandLineOptions(Nil)
 
-        options.version should be (false)
+        options.version should be(false)
       }
     }
 
@@ -100,7 +100,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
         it("should include values specified in file") {
 
           val pathToProfileFixture: String = new File(getClass.getResource("/fixtures/profile.json").toURI).getAbsolutePath
-          val options = new CommandLineOptions(Seq("--profile="+pathToProfileFixture))
+          val options = new CommandLineOptions(Seq("--profile=" + pathToProfileFixture))
 
           val config: Config = options.toConfig
 
@@ -114,7 +114,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
       }
     }
 
-    describe("when received --<protocol port name>=<value>"){
+    describe("when received --<protocol port name>=<value>") {
       it("should error if value is not set") {
         intercept[OptionException] {
           new CommandLineOptions(Seq("--stdin-port"))
@@ -156,7 +156,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
       }
     }
 
-    describe("when received --profile and --<protocol port name>=<value>"){
+    describe("when received --profile and --<protocol port name>=<value>") {
       describe("#toConfig") {
         it("should return config with <protocol port> argument value") {
 
@@ -175,7 +175,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
 
     }
 
-    describe("when no arguments are received"){
+    describe("when no arguments are received") {
       describe("#toConfig") {
         it("should read default value set in reference.conf") {
 
@@ -186,41 +186,42 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
           config.getInt("shell_port") should be(40544)
           config.getInt("iopub_port") should be(43462)
           config.getInt("control_port") should be(44808)
-          config.getInt("max_interpreter_threads") should be (4)
+          config.getInt("max_interpreter_threads") should be(4)
+          config.getInt("spark_context_intialization_timeout") should be(100)
         }
       }
     }
 
-    describe("when using -- to separate interpreter arguments"){
+    describe("when using -- to separate interpreter arguments") {
       describe("#toConfig") {
         it("should return interpreter_args config property when there are args before --") {
 
           val options = new CommandLineOptions(List("--stdin-port", "99999", "--shell-port", "88888", "--", "someArg1", "someArg2", "someArg3"))
 
-          val config: Config = options .toConfig
+          val config: Config = options.toConfig
 
           config.entrySet() should not be ('empty)
-          config.getStringList("interpreter_args").asScala should be (List("someArg1", "someArg2", "someArg3"))
+          config.getStringList("interpreter_args").asScala should be(List("someArg1", "someArg2", "someArg3"))
         }
 
         it("should return interpreter_args config property when args is at the beginning") {
 
           val options = new CommandLineOptions(List("--", "someArg1", "someArg2", "someArg3"))
 
-          val config: Config = options .toConfig
+          val config: Config = options.toConfig
 
           config.entrySet() should not be ('empty)
-          config.getStringList("interpreter_args").asScala should be (List("someArg1", "someArg2", "someArg3"))
+          config.getStringList("interpreter_args").asScala should be(List("someArg1", "someArg2", "someArg3"))
         }
 
         it("should return interpreter_args config property as empty list when there is nothing after --") {
 
           val options = new CommandLineOptions(List("--stdin-port", "99999", "--shell-port", "88888", "--"))
 
-          val config: Config = options .toConfig
+          val config: Config = options.toConfig
 
           config.entrySet() should not be ('empty)
-          config.getStringList("interpreter_args").asScala should be ('empty)
+          config.getStringList("interpreter_args").asScala should be('empty)
         }
       }
     }
@@ -262,7 +263,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
         val config: Config = options.toConfig
 
         config.getList("magic_urls").unwrapped.asScala should
-          be (Seq(url1, url2))
+          be(Seq(url1, url2))
       }
     }
 
@@ -281,6 +282,78 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
 
       }
     }
-  }
 
+    describe("when dealing with --spark-context-intialization-timeout") {
+      val key = "spark_context_intialization_timeout"
+
+      it("when none of the options are specified, it should default to 100") {
+        val options = new CommandLineOptions(Nil)
+        val config: Config = options.toConfig
+        config.getInt(key) should be(100)
+      }
+
+      it("when other options are specified, it should default to 100") {
+        val options = new CommandLineOptions(Seq(
+          "--interpreter-plugin",
+          "dummy:test.utils.DummyInterpreter"
+        ))
+        val config: Config = options.toConfig
+        config.getInt(key) should be(100)
+      }
+
+      it("when the options is specified, it should return the specified value") {
+        val options = new CommandLineOptions(List(
+          "--stdin-port", "99999",
+          "--shell-port", "88888",
+          "--iopub-port", "77777",
+          "--control-port", "55555",
+          "--heartbeat-port", "44444",
+          "--spark-context-intialization-timeout", "30000"
+        ))
+        val config: Config = options.toConfig
+        config.getInt(key) should be(30000)
+      }
+
+      it("when a negative value is specified, it should return the specified value") {
+        val options = new CommandLineOptions(List(
+          "--stdin-port", "99999",
+          "--shell-port", "88888",
+          "--iopub-port", "77777",
+          "--control-port", "55555",
+          "--heartbeat-port", "44444",
+          "--spark-context-intialization-timeout", "-1"
+        ))
+        val config: Config = options.toConfig
+        config.getInt(key) should be(-1)
+      }
+
+      it("when an invalid value is specified, an exception must be thrown") {
+        intercept [OptionException] {
+          val options = new CommandLineOptions(List(
+            "--stdin-port", "99999",
+            "--shell-port", "88888",
+            "--iopub-port", "77777",
+            "--control-port", "55555",
+            "--heartbeat-port", "44444",
+            "--spark-context-intialization-timeout", "foo"
+          ))
+          val config: Config = options.toConfig
+        }
+      }
+
+      it("when a value is not specified, an exception must be thrown") {
+        intercept [OptionException] {
+          val options = new CommandLineOptions(List(
+            "--stdin-port", "99999",
+            "--shell-port", "88888",
+            "--iopub-port", "77777",
+            "--control-port", "55555",
+            "--heartbeat-port", "44444",
+            "--spark-context-intialization-timeout", ""
+          ))
+          val config: Config = options.toConfig
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5048616c/kernel/src/test/scala/org/apache/toree/kernel/api/KernelSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/kernel/api/KernelSpec.scala b/kernel/src/test/scala/org/apache/toree/kernel/api/KernelSpec.scala
index 43ad8a8..de8d9d7 100644
--- a/kernel/src/test/scala/org/apache/toree/kernel/api/KernelSpec.scala
+++ b/kernel/src/test/scala/org/apache/toree/kernel/api/KernelSpec.scala
@@ -18,6 +18,7 @@
 package org.apache.toree.kernel.api
 
 import java.io.{InputStream, PrintStream}
+import java.util.concurrent.TimeUnit
 
 import com.typesafe.config.Config
 import org.apache.spark.{SparkConf, SparkContext}
@@ -103,7 +104,7 @@ class KernelSpec extends FunSpec with Matchers with MockitoSugar
       }
 
       it("should return error on None") {
-        kernel eval None should be ((false, Map("text/plain" -> "Error!")))
+        kernel eval None should be((false, Map("text/plain" -> "Error!")))
       }
     }
 
@@ -119,7 +120,7 @@ class KernelSpec extends FunSpec with Matchers with MockitoSugar
           new KernelMessage(Nil, "", mock[Header], mock[ParentHeader],
             mock[Metadata], "")
         )
-        kernel.out shouldBe a [PrintStream]
+        kernel.out shouldBe a[PrintStream]
       }
     }
 
@@ -137,7 +138,7 @@ class KernelSpec extends FunSpec with Matchers with MockitoSugar
         )
 
         // TODO: Access the underlying streamType field to assert stderr?
-        kernel.err shouldBe a [PrintStream]
+        kernel.err shouldBe a[PrintStream]
       }
     }
 
@@ -154,7 +155,7 @@ class KernelSpec extends FunSpec with Matchers with MockitoSugar
             mock[Metadata], "")
         )
 
-        kernel.in shouldBe a [InputStream]
+        kernel.in shouldBe a[InputStream]
       }
     }
 
@@ -171,7 +172,7 @@ class KernelSpec extends FunSpec with Matchers with MockitoSugar
             mock[Metadata], "")
         )
 
-        kernel.stream shouldBe a [StreamMethods]
+        kernel.stream shouldBe a[StreamMethods]
       }
     }
 
@@ -188,7 +189,7 @@ class KernelSpec extends FunSpec with Matchers with MockitoSugar
             mock[Metadata], "")
         )
 
-        kernel.display shouldBe a [DisplayMethods]
+        kernel.display shouldBe a[DisplayMethods]
       }
     }
 
@@ -200,7 +201,35 @@ class KernelSpec extends FunSpec with Matchers with MockitoSugar
 
         val sparkConf = kernel.createSparkConf(new SparkConf().setMaster(expected))
 
-        sparkConf.get("spark.master") should be (expected)
+        sparkConf.get("spark.master") should be(expected)
+      }
+    }
+
+    describe("when spark-context-initialization-timeout is a valid value") {
+
+      it("should use the specified timeout to initialize spark context") {
+        val expectedTimeout: Long = 30000
+        doReturn(expectedTimeout).when(mockConfig).getDuration("spark_context_intialization_timeout", TimeUnit.MILLISECONDS)
+
+        kernel.getSparkContextInitializationTimeout should be(expectedTimeout)
+      }
+
+      it("should throw an exception when negative value is specified as timeout") {
+        intercept[RuntimeException] {
+          val timeout: Long = -30000
+          doReturn(timeout).when(mockConfig).getDuration("spark_context_intialization_timeout", TimeUnit.MILLISECONDS)
+
+          kernel.getSparkContextInitializationTimeout
+        }
+      }
+
+      it("should throw an exception when zero is specified as timeout") {
+        intercept[RuntimeException] {
+          val timeout: Long = 0
+          doReturn(timeout).when(mockConfig).getDuration("spark_context_intialization_timeout", TimeUnit.MILLISECONDS)
+
+          kernel.getSparkContextInitializationTimeout
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5048616c/resources/compile/application.conf
----------------------------------------------------------------------
diff --git a/resources/compile/application.conf b/resources/compile/application.conf
index 4c85f99..8c38f7d 100644
--- a/resources/compile/application.conf
+++ b/resources/compile/application.conf
@@ -26,3 +26,4 @@ control_port  = ${?PORT1}
 hb_port       = ${?PORT2}
 shell_port    = ${?PORT3}
 iopub_port    = ${?PORT4}
+spark_context_intialization_timeout = ${?SPARK_CONTEXT_INITIALIZATION_TIMEOUT}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5048616c/resources/compile/reference.conf
----------------------------------------------------------------------
diff --git a/resources/compile/reference.conf b/resources/compile/reference.conf
index a8a2112..34bcd6d 100644
--- a/resources/compile/reference.conf
+++ b/resources/compile/reference.conf
@@ -55,6 +55,9 @@ deps_dir = ${?DEPS_DIR}
 default_interpreter = "Scala"
 default_interpreter = ${?DEFAULT_INTERPRETER}
 
+spark_context_intialization_timeout = 100
+spark_context_intialization_timeout = ${?SPARK_CONTEXT_INITIALIZATION_TIMEOUT}
+
 default_interpreter_plugin = [
   "Scala:org.apache.toree.kernel.interpreter.scala.ScalaInterpreter",
   "PySpark:org.apache.toree.kernel.interpreter.pyspark.PySparkInterpreter",

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5048616c/resources/test/application.conf
----------------------------------------------------------------------
diff --git a/resources/test/application.conf b/resources/test/application.conf
index 4c85f99..8c38f7d 100644
--- a/resources/test/application.conf
+++ b/resources/test/application.conf
@@ -26,3 +26,4 @@ control_port  = ${?PORT1}
 hb_port       = ${?PORT2}
 shell_port    = ${?PORT3}
 iopub_port    = ${?PORT4}
+spark_context_intialization_timeout = ${?SPARK_CONTEXT_INITIALIZATION_TIMEOUT}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5048616c/resources/test/reference.conf
----------------------------------------------------------------------
diff --git a/resources/test/reference.conf b/resources/test/reference.conf
index c7c9363..5f06560 100644
--- a/resources/test/reference.conf
+++ b/resources/test/reference.conf
@@ -54,6 +54,9 @@ send_empty_output = ${?SEND_EMPTY_OUTPUT}
 default_interpreter = "Scala"
 default_interpreter = ${?DEFAULT_INTERPRETER}
 
+spark_context_intialization_timeout = 100
+spark_context_intialization_timeout = ${?SPARK_CONTEXT_INITIALIZATION_TIMEOUT}
+
 default_interpreter_plugin = [
   "Scala:org.apache.toree.kernel.interpreter.scala.ScalaInterpreter",
   "PySpark:org.apache.toree.kernel.interpreter.pyspark.PySparkInterpreter",