You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/09/20 22:53:34 UTC

spark git commit: [SPARK-18838][HOTFIX][YARN] Check internal context state before stopping it.

Repository: spark
Updated Branches:
  refs/heads/master ce6a71e01 -> bb9c0697d


[SPARK-18838][HOTFIX][YARN] Check internal context state before stopping it.

The live listener bus now cleans up after itself and releases listeners
after stopping, so code cannot get references to listeners after the
Spark context is stopped.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #19297 from vanzin/SPARK-18838.hotfix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb9c0697
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb9c0697
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb9c0697

Branch: refs/heads/master
Commit: bb9c0697d509880d30c874d7f6384d7633aed061
Parents: ce6a71e
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Wed Sep 20 15:53:28 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Wed Sep 20 15:53:28 2017 -0700

----------------------------------------------------------------------
 .../spark/deploy/yarn/YarnClusterSuite.scala    | 74 ++++++++++----------
 1 file changed, 37 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bb9c0697/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index fc78bc4..d5de190 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -390,47 +390,47 @@ private object YarnClusterDriver extends Logging with Matchers {
         .map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull }
         .collect()
       assert(configFromExecutors.find(_ == null) === None)
-    } finally {
-      Files.write(result, status, StandardCharsets.UTF_8)
-      sc.stop()
-    }
 
-    // verify log urls are present
-    val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
-    assert(listeners.size === 1)
-    val listener = listeners(0)
-    val executorInfos = listener.addedExecutorInfos.values
-    assert(executorInfos.nonEmpty)
-    executorInfos.foreach { info =>
-      assert(info.logUrlMap.nonEmpty)
-      info.logUrlMap.values.foreach { url =>
-        val log = Source.fromURL(url).mkString
-        assert(
-          !log.contains(SECRET_PASSWORD),
-          s"Executor logs contain sensitive info (${SECRET_PASSWORD}): \n${log} "
-        )
+      // verify log urls are present
+      val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
+      assert(listeners.size === 1)
+      val listener = listeners(0)
+      val executorInfos = listener.addedExecutorInfos.values
+      assert(executorInfos.nonEmpty)
+      executorInfos.foreach { info =>
+        assert(info.logUrlMap.nonEmpty)
+        info.logUrlMap.values.foreach { url =>
+          val log = Source.fromURL(url).mkString
+          assert(
+            !log.contains(SECRET_PASSWORD),
+            s"Executor logs contain sensitive info (${SECRET_PASSWORD}): \n${log} "
+          )
+        }
       }
-    }
 
-    // If we are running in yarn-cluster mode, verify that driver logs links and present and are
-    // in the expected format.
-    if (conf.get("spark.submit.deployMode") == "cluster") {
-      assert(listener.driverLogs.nonEmpty)
-      val driverLogs = listener.driverLogs.get
-      assert(driverLogs.size === 2)
-      assert(driverLogs.contains("stderr"))
-      assert(driverLogs.contains("stdout"))
-      val urlStr = driverLogs("stderr")
-      driverLogs.foreach { kv =>
-        val log = Source.fromURL(kv._2).mkString
-        assert(
-          !log.contains(SECRET_PASSWORD),
-          s"Driver logs contain sensitive info (${SECRET_PASSWORD}): \n${log} "
-        )
+      // If we are running in yarn-cluster mode, verify that driver logs links and present and are
+      // in the expected format.
+      if (conf.get("spark.submit.deployMode") == "cluster") {
+        assert(listener.driverLogs.nonEmpty)
+        val driverLogs = listener.driverLogs.get
+        assert(driverLogs.size === 2)
+        assert(driverLogs.contains("stderr"))
+        assert(driverLogs.contains("stdout"))
+        val urlStr = driverLogs("stderr")
+        driverLogs.foreach { kv =>
+          val log = Source.fromURL(kv._2).mkString
+          assert(
+            !log.contains(SECRET_PASSWORD),
+            s"Driver logs contain sensitive info (${SECRET_PASSWORD}): \n${log} "
+          )
+        }
+        val containerId = YarnSparkHadoopUtil.get.getContainerId
+        val user = Utils.getCurrentUserName()
+        assert(urlStr.endsWith(s"/node/containerlogs/$containerId/$user/stderr?start=-4096"))
       }
-      val containerId = YarnSparkHadoopUtil.get.getContainerId
-      val user = Utils.getCurrentUserName()
-      assert(urlStr.endsWith(s"/node/containerlogs/$containerId/$user/stderr?start=-4096"))
+    } finally {
+      Files.write(result, status, StandardCharsets.UTF_8)
+      sc.stop()
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org