You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2017/11/22 12:35:52 UTC
spark git commit: [SPARK-22572][SPARK SHELL] spark-shell does not
re-initialize on :replay
Repository: spark
Updated Branches:
refs/heads/master 572af5027 -> 327d25fe1
[SPARK-22572][SPARK SHELL] spark-shell does not re-initialize on :replay
## What changes were proposed in this pull request?
Ticket: [SPARK-22572](https://issues.apache.org/jira/browse/SPARK-22572)
## How was this patch tested?
Added a new test case to `org.apache.spark.repl.ReplSuite`
Author: Mark Petruska <pe...@gmail.com>
Closes #19791 from mpetruska/SPARK-22572.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/327d25fe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/327d25fe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/327d25fe
Branch: refs/heads/master
Commit: 327d25fe1741f62cd84097e94739f82ecb05383a
Parents: 572af50
Author: Mark Petruska <pe...@gmail.com>
Authored: Wed Nov 22 21:35:47 2017 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Wed Nov 22 21:35:47 2017 +0900
----------------------------------------------------------------------
.../org/apache/spark/repl/SparkILoop.scala | 75 +++++++++++---------
.../org/apache/spark/repl/SparkILoop.scala | 74 +++++++++++--------
.../scala/org/apache/spark/repl/ReplSuite.scala | 10 +++
3 files changed, 96 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/327d25fe/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index ea279e4..3ce7cc7 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -35,40 +35,45 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
def this() = this(None, new JPrintWriter(Console.out, true))
+ val initializationCommands: Seq[String] = Seq(
+ """
+ @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
+ org.apache.spark.repl.Main.sparkSession
+ } else {
+ org.apache.spark.repl.Main.createSparkSession()
+ }
+ @transient val sc = {
+ val _sc = spark.sparkContext
+ if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
+ val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
+ if (proxyUrl != null) {
+ println(
+ s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
+ } else {
+ println(s"Spark Context Web UI is available at Spark Master Public URL")
+ }
+ } else {
+ _sc.uiWebUrl.foreach {
+ webUrl => println(s"Spark context Web UI available at ${webUrl}")
+ }
+ }
+ println("Spark context available as 'sc' " +
+ s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
+ println("Spark session available as 'spark'.")
+ _sc
+ }
+ """,
+ "import org.apache.spark.SparkContext._",
+ "import spark.implicits._",
+ "import spark.sql",
+ "import org.apache.spark.sql.functions._"
+ )
+
def initializeSpark() {
intp.beQuietDuring {
- processLine("""
- @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
- org.apache.spark.repl.Main.sparkSession
- } else {
- org.apache.spark.repl.Main.createSparkSession()
- }
- @transient val sc = {
- val _sc = spark.sparkContext
- if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
- val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
- if (proxyUrl != null) {
- println(
- s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
- } else {
- println(s"Spark Context Web UI is available at Spark Master Public URL")
- }
- } else {
- _sc.uiWebUrl.foreach {
- webUrl => println(s"Spark context Web UI available at ${webUrl}")
- }
- }
- println("Spark context available as 'sc' " +
- s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
- println("Spark session available as 'spark'.")
- _sc
- }
- """)
- processLine("import org.apache.spark.SparkContext._")
- processLine("import spark.implicits._")
- processLine("import spark.sql")
- processLine("import org.apache.spark.sql.functions._")
- replayCommandStack = Nil // remove above commands from session history.
+ savingReplayStack { // remove the commands from session history.
+ initializationCommands.foreach(processLine)
+ }
}
}
@@ -107,6 +112,12 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
initializeSpark()
echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.")
}
+
+ override def replay(): Unit = {
+ initializeSpark()
+ super.replay()
+ }
+
}
object SparkILoop {
http://git-wip-us.apache.org/repos/asf/spark/blob/327d25fe/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 900edd6..ffb2e5f 100644
--- a/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -32,39 +32,45 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
def this() = this(None, new JPrintWriter(Console.out, true))
+ val initializationCommands: Seq[String] = Seq(
+ """
+ @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
+ org.apache.spark.repl.Main.sparkSession
+ } else {
+ org.apache.spark.repl.Main.createSparkSession()
+ }
+ @transient val sc = {
+ val _sc = spark.sparkContext
+ if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
+ val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
+ if (proxyUrl != null) {
+ println(
+ s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
+ } else {
+ println(s"Spark Context Web UI is available at Spark Master Public URL")
+ }
+ } else {
+ _sc.uiWebUrl.foreach {
+ webUrl => println(s"Spark context Web UI available at ${webUrl}")
+ }
+ }
+ println("Spark context available as 'sc' " +
+ s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
+ println("Spark session available as 'spark'.")
+ _sc
+ }
+ """,
+ "import org.apache.spark.SparkContext._",
+ "import spark.implicits._",
+ "import spark.sql",
+ "import org.apache.spark.sql.functions._"
+ )
+
def initializeSpark() {
intp.beQuietDuring {
- command("""
- @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
- org.apache.spark.repl.Main.sparkSession
- } else {
- org.apache.spark.repl.Main.createSparkSession()
- }
- @transient val sc = {
- val _sc = spark.sparkContext
- if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) {
- val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null)
- if (proxyUrl != null) {
- println(
- s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}")
- } else {
- println(s"Spark Context Web UI is available at Spark Master Public URL")
- }
- } else {
- _sc.uiWebUrl.foreach {
- webUrl => println(s"Spark context Web UI available at ${webUrl}")
- }
- }
- println("Spark context available as 'sc' " +
- s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
- println("Spark session available as 'spark'.")
- _sc
- }
- """)
- command("import org.apache.spark.SparkContext._")
- command("import spark.implicits._")
- command("import spark.sql")
- command("import org.apache.spark.sql.functions._")
+ savingReplayStack { // remove the commands from session history.
+ initializationCommands.foreach(command)
+ }
}
}
@@ -103,6 +109,12 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
initializeSpark()
echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.")
}
+
+ override def replay(): Unit = {
+ initializeSpark()
+ super.replay()
+ }
+
}
object SparkILoop {
http://git-wip-us.apache.org/repos/asf/spark/blob/327d25fe/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index c7ae194..905b41c 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -217,4 +217,14 @@ class ReplSuite extends SparkFunSuite {
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
}
+
+ test(":replay should work correctly") {
+ val output = runInterpreter("local",
+ """
+ |sc
+ |:replay
+ """.stripMargin)
+ assertDoesNotContain("error: not found: value sc", output)
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org