You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2014/12/24 00:15:00 UTC

spark git commit: [SPARK-4802] [streaming] Remove receiverInfo once receiver is de-registered

Repository: spark
Updated Branches:
  refs/heads/master 96281cd0c -> 10d69e9cb


[SPARK-4802] [streaming] Remove receiverInfo once receiver is de-registered

  Once the streaming receiver is de-registered at executor, the `ReceiverTrackerActor` needs to
remove the corresponding reveiverInfo from the `receiverInfo` map at `ReceiverTracker`.

Author: Ilayaperumal Gopinathan <ig...@pivotal.io>

Closes #3647 from ilayaperumalg/receiverInfo-RTracker and squashes the following commits:

6eb97d5 [Ilayaperumal Gopinathan] Polishing based on the review
3640c86 [Ilayaperumal Gopinathan] Remove receiverInfo once receiver is de-registered


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10d69e9c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10d69e9c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10d69e9c

Branch: refs/heads/master
Commit: 10d69e9cbfdabe95d0e513176d5347d7b59da0ee
Parents: 96281cd
Author: Ilayaperumal Gopinathan <ig...@pivotal.io>
Authored: Tue Dec 23 15:14:54 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Dec 23 15:14:54 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/streaming/scheduler/ReceiverTracker.scala   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/10d69e9c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 32e481d..1f0e442 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -150,8 +150,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
         logWarning("No prior receiver info")
         ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
     }
-    receiverInfo(streamId) = newReceiverInfo
-    listenerBus.post(StreamingListenerReceiverStopped(receiverInfo(streamId)))
+    receiverInfo -= streamId
+    listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))
     val messageWithError = if (error != null && !error.isEmpty) {
       s"$message - $error"
     } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org