You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/03 06:13:56 UTC

spark git commit: [SPARK-7997][CORE] Add rpcEnv.awaitTermination() back to SparkEnv

Repository: spark
Updated Branches:
  refs/heads/master 055714661 -> 335f10eda


[SPARK-7997][CORE] Add rpcEnv.awaitTermination() back to SparkEnv

`rpcEnv.awaitTermination()` was not added in #10854 because some Streaming Python tests hung forever.

This patch fixed the hung issue and added rpcEnv.awaitTermination() back to SparkEnv.

Previously, Streaming Kafka Python tests shutdowns the zookeeper server before stopping StreamingContext. Then when stopping StreamingContext, KafkaReceiver may be hung due to https://issues.apache.org/jira/browse/KAFKA-601, hence, some thread of RpcEnv's Dispatcher cannot exit and rpcEnv.awaitTermination is hung.The patch just changed the shutdown order to fix it.

Author: Shixiong Zhu <sh...@databricks.com>

Closes #11031 from zsxwing/awaitTermination.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/335f10ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/335f10ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/335f10ed

Branch: refs/heads/master
Commit: 335f10edad8c759bad3dbd0660ed4dd5d70ddd8b
Parents: 0557146
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Tue Feb 2 21:13:54 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Feb 2 21:13:54 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkEnv.scala | 1 +
 python/pyspark/streaming/tests.py                   | 4 ++--
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/335f10ed/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 12c7b20..9461afd 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -91,6 +91,7 @@ class SparkEnv (
       metricsSystem.stop()
       outputCommitCoordinator.stop()
       rpcEnv.shutdown()
+      rpcEnv.awaitTermination()
 
       // Note that blockTransferService is stopped by BlockManager since it is started by it.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/335f10ed/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 24b8126..b33e825 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1013,12 +1013,12 @@ class KafkaStreamTests(PySparkStreamingTestCase):
         self._kafkaTestUtils.setup()
 
     def tearDown(self):
+        super(KafkaStreamTests, self).tearDown()
+
         if self._kafkaTestUtils is not None:
             self._kafkaTestUtils.teardown()
             self._kafkaTestUtils = None
 
-        super(KafkaStreamTests, self).tearDown()
-
     def _randomTopic(self):
         return "topic-%d" % random.randint(0, 10000)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org