You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@toree.apache.org by lr...@apache.org on 2018/01/03 01:13:25 UTC
incubator-toree git commit: [TOREE-443] Add
--spark-context-initialization-timeout cmd-line option
Repository: incubator-toree
Updated Branches:
refs/heads/master 0aafc3a7b -> 5048616c7
[TOREE-443] Add --spark-context-initialization-timeout cmd-line option
Instead of hardcoding the timeout to 100ms,
--spark-context-initialization-timeout command-line option
has been introduced to make it configurable. On slow/busy nodes,
an appropriate timeout can be specified to avoid the confusing
messages in the notebook UI. Added unit tests to
CommandLineOptionsSpec.scala and KernelSpec.scala.
Closes #147
Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/5048616c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/5048616c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/5048616c
Branch: refs/heads/master
Commit: 5048616c78aed9f829ac96ae1d96f81b49ae5afd
Parents: 0aafc3a
Author: Sanjay Saxena <sa...@gmail.com>
Authored: Wed Nov 15 13:52:30 2017 -0800
Committer: Luciano Resende <lr...@apache.org>
Committed: Tue Jan 2 17:09:38 2018 -0800
----------------------------------------------------------------------
.../apache/toree/boot/CommandLineOptions.scala | 10 +-
.../org/apache/toree/kernel/api/Kernel.scala | 21 +++-
.../toree/boot/CommandLineOptionsSpec.scala | 115 +++++++++++++++----
.../apache/toree/kernel/api/KernelSpec.scala | 43 +++++--
resources/compile/application.conf | 1 +
resources/compile/reference.conf | 3 +
resources/test/application.conf | 1 +
resources/test/reference.conf | 3 +
8 files changed, 163 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5048616c/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala b/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala
index 1187bba..4fc021f 100644
--- a/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala
+++ b/kernel/src/main/scala/org/apache/toree/boot/CommandLineOptions.scala
@@ -19,9 +19,7 @@ package org.apache.toree.boot
import java.io.{File, OutputStream}
-import org.apache.toree.utils.KeyValuePairUtils
import com.typesafe.config.{Config, ConfigFactory}
-import joptsimple.util.KeyValuePair
import joptsimple.{OptionParser, OptionSpec}
import scala.collection.JavaConverters._
@@ -100,6 +98,13 @@ class CommandLineOptions(args: Seq[String]) {
"interpreter-plugin"
).withRequiredArg().ofType(classOf[String])
+ private val _spark_context_intialization_timeout = parser.accepts(
+ "spark-context-intialization-timeout",
+ "The time (in milliseconds) allowed for creation of the spark context. " +
+ "Failure to create a context in this time could result in duplicate initialization messages. " +
+ "The default value is 100 milliseconds."
+ ).withRequiredArg().ofType(classOf[Long])
+
private val options = parser.parse(args.map(_.trim): _*)
/*
@@ -149,6 +154,7 @@ class CommandLineOptions(args: Seq[String]) {
"magic_urls" -> getAll(_magic_url).map(_.asJava)
.flatMap(list => if (list.isEmpty) None else Some(list)),
"max_interpreter_threads" -> get(_max_interpreter_threads),
+ "spark_context_intialization_timeout" -> get(_spark_context_intialization_timeout),
"jar_dir" -> get(_jar_dir),
"default_interpreter" -> get(_default_interpreter),
"nosparkcontext" -> (if (has(_nosparkcontext)) Some(true) else Some(false)),
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5048616c/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala b/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
index 65a8b98..6900155 100644
--- a/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
+++ b/kernel/src/main/scala/org/apache/toree/kernel/api/Kernel.scala
@@ -392,6 +392,17 @@ class Kernel (
sparkContext
}
+ // TODO: Exposed for testing purposes.
+ protected[toree] def getSparkContextInitializationTimeout: Long = {
+ val timeout:Long = config.getDuration("spark_context_intialization_timeout", TimeUnit.MILLISECONDS)
+ if (timeout <= 0) {
+ val clOptionName = "spark-context-intialization-timeout"
+ throw new RuntimeException(s"--$clOptionName: Invalid timeout of '$timeout' milliseconds specified. " +
+ s"Must specify a positive value.")
+ }
+ timeout
+ }
+
override def interpreter(name: String): Option[Interpreter] = {
interpreterManager.interpreters.get(name)
}
@@ -401,17 +412,19 @@ class Kernel (
override def sparkSession: SparkSession = {
defaultSparkConf.getOption("spark.master") match {
case Some(master) if !master.contains("local") =>
- // when connecting to a remote cluster, the first call to getOrCreate
+ // When connecting to a remote cluster, the first call to getOrCreate
// may create a session and take a long time, so this starts a future
- // to get the session. if it take longer than 100 ms, then print a
- // message to the user that Spark is starting.
+ // to get the session. If it take longer than specified timeout, then
+ // print a message to the user that Spark is starting. Note, the
+ // default timeout is 100ms and it is specified in reference.conf.
import scala.concurrent.ExecutionContext.Implicits.global
val sessionFuture = Future {
SparkSession.builder.config(defaultSparkConf).getOrCreate
}
try {
- Await.result(sessionFuture, Duration(100, TimeUnit.MILLISECONDS))
+ val timeout = getSparkContextInitializationTimeout
+ Await.result(sessionFuture, Duration(timeout, TimeUnit.MILLISECONDS))
} catch {
case timeout: TimeoutException =>
// getting the session is taking a long time, so assume that Spark
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5048616c/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala b/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala
index cc1173c..d79014d 100644
--- a/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala
+++ b/kernel/src/test/scala/org/apache/toree/boot/CommandLineOptionsSpec.scala
@@ -37,7 +37,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
val actual = options.toConfig.getInt("max_interpreter_threads")
- actual should be (expected)
+ actual should be(expected)
}
}
@@ -45,7 +45,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
it("should set the help flag to true") {
val options = new CommandLineOptions("--help" :: Nil)
- options.help should be (true)
+ options.help should be(true)
}
}
@@ -53,7 +53,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
it("should set the help flag to true") {
val options = new CommandLineOptions("-h" :: Nil)
- options.help should be (true)
+ options.help should be(true)
}
}
@@ -61,7 +61,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
it("should set the help flag to false") {
val options = new CommandLineOptions(Nil)
- options.help should be (false)
+ options.help should be(false)
}
}
@@ -69,7 +69,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
it("should set the version flag to true") {
val options = new CommandLineOptions("--version" :: Nil)
- options.version should be (true)
+ options.version should be(true)
}
}
@@ -77,7 +77,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
it("should set the version flag to true") {
val options = new CommandLineOptions("-v" :: Nil)
- options.version should be (true)
+ options.version should be(true)
}
}
@@ -85,7 +85,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
it("should set the version flag to false") {
val options = new CommandLineOptions(Nil)
- options.version should be (false)
+ options.version should be(false)
}
}
@@ -100,7 +100,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
it("should include values specified in file") {
val pathToProfileFixture: String = new File(getClass.getResource("/fixtures/profile.json").toURI).getAbsolutePath
- val options = new CommandLineOptions(Seq("--profile="+pathToProfileFixture))
+ val options = new CommandLineOptions(Seq("--profile=" + pathToProfileFixture))
val config: Config = options.toConfig
@@ -114,7 +114,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
}
}
- describe("when received --<protocol port name>=<value>"){
+ describe("when received --<protocol port name>=<value>") {
it("should error if value is not set") {
intercept[OptionException] {
new CommandLineOptions(Seq("--stdin-port"))
@@ -156,7 +156,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
}
}
- describe("when received --profile and --<protocol port name>=<value>"){
+ describe("when received --profile and --<protocol port name>=<value>") {
describe("#toConfig") {
it("should return config with <protocol port> argument value") {
@@ -175,7 +175,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
}
- describe("when no arguments are received"){
+ describe("when no arguments are received") {
describe("#toConfig") {
it("should read default value set in reference.conf") {
@@ -186,41 +186,42 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
config.getInt("shell_port") should be(40544)
config.getInt("iopub_port") should be(43462)
config.getInt("control_port") should be(44808)
- config.getInt("max_interpreter_threads") should be (4)
+ config.getInt("max_interpreter_threads") should be(4)
+ config.getInt("spark_context_intialization_timeout") should be(100)
}
}
}
- describe("when using -- to separate interpreter arguments"){
+ describe("when using -- to separate interpreter arguments") {
describe("#toConfig") {
it("should return interpreter_args config property when there are args before --") {
val options = new CommandLineOptions(List("--stdin-port", "99999", "--shell-port", "88888", "--", "someArg1", "someArg2", "someArg3"))
- val config: Config = options .toConfig
+ val config: Config = options.toConfig
config.entrySet() should not be ('empty)
- config.getStringList("interpreter_args").asScala should be (List("someArg1", "someArg2", "someArg3"))
+ config.getStringList("interpreter_args").asScala should be(List("someArg1", "someArg2", "someArg3"))
}
it("should return interpreter_args config property when args is at the beginning") {
val options = new CommandLineOptions(List("--", "someArg1", "someArg2", "someArg3"))
- val config: Config = options .toConfig
+ val config: Config = options.toConfig
config.entrySet() should not be ('empty)
- config.getStringList("interpreter_args").asScala should be (List("someArg1", "someArg2", "someArg3"))
+ config.getStringList("interpreter_args").asScala should be(List("someArg1", "someArg2", "someArg3"))
}
it("should return interpreter_args config property as empty list when there is nothing after --") {
val options = new CommandLineOptions(List("--stdin-port", "99999", "--shell-port", "88888", "--"))
- val config: Config = options .toConfig
+ val config: Config = options.toConfig
config.entrySet() should not be ('empty)
- config.getStringList("interpreter_args").asScala should be ('empty)
+ config.getStringList("interpreter_args").asScala should be('empty)
}
}
}
@@ -262,7 +263,7 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
val config: Config = options.toConfig
config.getList("magic_urls").unwrapped.asScala should
- be (Seq(url1, url2))
+ be(Seq(url1, url2))
}
}
@@ -281,6 +282,78 @@ class CommandLineOptionsSpec extends FunSpec with Matchers {
}
}
- }
+ describe("when dealing with --spark-context-intialization-timeout") {
+ val key = "spark_context_intialization_timeout"
+
+ it("when none of the options are specified, it should default to 100") {
+ val options = new CommandLineOptions(Nil)
+ val config: Config = options.toConfig
+ config.getInt(key) should be(100)
+ }
+
+ it("when other options are specified, it should default to 100") {
+ val options = new CommandLineOptions(Seq(
+ "--interpreter-plugin",
+ "dummy:test.utils.DummyInterpreter"
+ ))
+ val config: Config = options.toConfig
+ config.getInt(key) should be(100)
+ }
+
+ it("when the options is specified, it should return the specified value") {
+ val options = new CommandLineOptions(List(
+ "--stdin-port", "99999",
+ "--shell-port", "88888",
+ "--iopub-port", "77777",
+ "--control-port", "55555",
+ "--heartbeat-port", "44444",
+ "--spark-context-intialization-timeout", "30000"
+ ))
+ val config: Config = options.toConfig
+ config.getInt(key) should be(30000)
+ }
+
+ it("when a negative value is specified, it should return the specified value") {
+ val options = new CommandLineOptions(List(
+ "--stdin-port", "99999",
+ "--shell-port", "88888",
+ "--iopub-port", "77777",
+ "--control-port", "55555",
+ "--heartbeat-port", "44444",
+ "--spark-context-intialization-timeout", "-1"
+ ))
+ val config: Config = options.toConfig
+ config.getInt(key) should be(-1)
+ }
+
+ it("when an invalid value is specified, an exception must be thrown") {
+ intercept [OptionException] {
+ val options = new CommandLineOptions(List(
+ "--stdin-port", "99999",
+ "--shell-port", "88888",
+ "--iopub-port", "77777",
+ "--control-port", "55555",
+ "--heartbeat-port", "44444",
+ "--spark-context-intialization-timeout", "foo"
+ ))
+ val config: Config = options.toConfig
+ }
+ }
+
+ it("when a value is not specified, an exception must be thrown") {
+ intercept [OptionException] {
+ val options = new CommandLineOptions(List(
+ "--stdin-port", "99999",
+ "--shell-port", "88888",
+ "--iopub-port", "77777",
+ "--control-port", "55555",
+ "--heartbeat-port", "44444",
+ "--spark-context-intialization-timeout", ""
+ ))
+ val config: Config = options.toConfig
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5048616c/kernel/src/test/scala/org/apache/toree/kernel/api/KernelSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/kernel/api/KernelSpec.scala b/kernel/src/test/scala/org/apache/toree/kernel/api/KernelSpec.scala
index 43ad8a8..de8d9d7 100644
--- a/kernel/src/test/scala/org/apache/toree/kernel/api/KernelSpec.scala
+++ b/kernel/src/test/scala/org/apache/toree/kernel/api/KernelSpec.scala
@@ -18,6 +18,7 @@
package org.apache.toree.kernel.api
import java.io.{InputStream, PrintStream}
+import java.util.concurrent.TimeUnit
import com.typesafe.config.Config
import org.apache.spark.{SparkConf, SparkContext}
@@ -103,7 +104,7 @@ class KernelSpec extends FunSpec with Matchers with MockitoSugar
}
it("should return error on None") {
- kernel eval None should be ((false, Map("text/plain" -> "Error!")))
+ kernel eval None should be((false, Map("text/plain" -> "Error!")))
}
}
@@ -119,7 +120,7 @@ class KernelSpec extends FunSpec with Matchers with MockitoSugar
new KernelMessage(Nil, "", mock[Header], mock[ParentHeader],
mock[Metadata], "")
)
- kernel.out shouldBe a [PrintStream]
+ kernel.out shouldBe a[PrintStream]
}
}
@@ -137,7 +138,7 @@ class KernelSpec extends FunSpec with Matchers with MockitoSugar
)
// TODO: Access the underlying streamType field to assert stderr?
- kernel.err shouldBe a [PrintStream]
+ kernel.err shouldBe a[PrintStream]
}
}
@@ -154,7 +155,7 @@ class KernelSpec extends FunSpec with Matchers with MockitoSugar
mock[Metadata], "")
)
- kernel.in shouldBe a [InputStream]
+ kernel.in shouldBe a[InputStream]
}
}
@@ -171,7 +172,7 @@ class KernelSpec extends FunSpec with Matchers with MockitoSugar
mock[Metadata], "")
)
- kernel.stream shouldBe a [StreamMethods]
+ kernel.stream shouldBe a[StreamMethods]
}
}
@@ -188,7 +189,7 @@ class KernelSpec extends FunSpec with Matchers with MockitoSugar
mock[Metadata], "")
)
- kernel.display shouldBe a [DisplayMethods]
+ kernel.display shouldBe a[DisplayMethods]
}
}
@@ -200,7 +201,35 @@ class KernelSpec extends FunSpec with Matchers with MockitoSugar
val sparkConf = kernel.createSparkConf(new SparkConf().setMaster(expected))
- sparkConf.get("spark.master") should be (expected)
+ sparkConf.get("spark.master") should be(expected)
+ }
+ }
+
+ describe("when spark-context-initialization-timeout is a valid value") {
+
+ it("should use the specified timeout to initialize spark context") {
+ val expectedTimeout: Long = 30000
+ doReturn(expectedTimeout).when(mockConfig).getDuration("spark_context_intialization_timeout", TimeUnit.MILLISECONDS)
+
+ kernel.getSparkContextInitializationTimeout should be(expectedTimeout)
+ }
+
+ it("should throw an exception when negative value is specified as timeout") {
+ intercept[RuntimeException] {
+ val timeout: Long = -30000
+ doReturn(timeout).when(mockConfig).getDuration("spark_context_intialization_timeout", TimeUnit.MILLISECONDS)
+
+ kernel.getSparkContextInitializationTimeout
+ }
+ }
+
+ it("should throw an exception when zero is specified as timeout") {
+ intercept[RuntimeException] {
+ val timeout: Long = 0
+ doReturn(timeout).when(mockConfig).getDuration("spark_context_intialization_timeout", TimeUnit.MILLISECONDS)
+
+ kernel.getSparkContextInitializationTimeout
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5048616c/resources/compile/application.conf
----------------------------------------------------------------------
diff --git a/resources/compile/application.conf b/resources/compile/application.conf
index 4c85f99..8c38f7d 100644
--- a/resources/compile/application.conf
+++ b/resources/compile/application.conf
@@ -26,3 +26,4 @@ control_port = ${?PORT1}
hb_port = ${?PORT2}
shell_port = ${?PORT3}
iopub_port = ${?PORT4}
+spark_context_intialization_timeout = ${?SPARK_CONTEXT_INITIALIZATION_TIMEOUT}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5048616c/resources/compile/reference.conf
----------------------------------------------------------------------
diff --git a/resources/compile/reference.conf b/resources/compile/reference.conf
index a8a2112..34bcd6d 100644
--- a/resources/compile/reference.conf
+++ b/resources/compile/reference.conf
@@ -55,6 +55,9 @@ deps_dir = ${?DEPS_DIR}
default_interpreter = "Scala"
default_interpreter = ${?DEFAULT_INTERPRETER}
+spark_context_intialization_timeout = 100
+spark_context_intialization_timeout = ${?SPARK_CONTEXT_INITIALIZATION_TIMEOUT}
+
default_interpreter_plugin = [
"Scala:org.apache.toree.kernel.interpreter.scala.ScalaInterpreter",
"PySpark:org.apache.toree.kernel.interpreter.pyspark.PySparkInterpreter",
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5048616c/resources/test/application.conf
----------------------------------------------------------------------
diff --git a/resources/test/application.conf b/resources/test/application.conf
index 4c85f99..8c38f7d 100644
--- a/resources/test/application.conf
+++ b/resources/test/application.conf
@@ -26,3 +26,4 @@ control_port = ${?PORT1}
hb_port = ${?PORT2}
shell_port = ${?PORT3}
iopub_port = ${?PORT4}
+spark_context_intialization_timeout = ${?SPARK_CONTEXT_INITIALIZATION_TIMEOUT}
http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/5048616c/resources/test/reference.conf
----------------------------------------------------------------------
diff --git a/resources/test/reference.conf b/resources/test/reference.conf
index c7c9363..5f06560 100644
--- a/resources/test/reference.conf
+++ b/resources/test/reference.conf
@@ -54,6 +54,9 @@ send_empty_output = ${?SEND_EMPTY_OUTPUT}
default_interpreter = "Scala"
default_interpreter = ${?DEFAULT_INTERPRETER}
+spark_context_intialization_timeout = 100
+spark_context_intialization_timeout = ${?SPARK_CONTEXT_INITIALIZATION_TIMEOUT}
+
default_interpreter_plugin = [
"Scala:org.apache.toree.kernel.interpreter.scala.ScalaInterpreter",
"PySpark:org.apache.toree.kernel.interpreter.pyspark.PySparkInterpreter",