You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/01/20 10:41:22 UTC

spark git commit: [SPARK-4803] [streaming] Remove duplicate RegisterReceiver message

Repository: spark
Updated Branches:
  refs/heads/master debc03195 -> 4afad9c77


[SPARK-4803] [streaming] Remove duplicate RegisterReceiver message

  - The ReceiverTracker receivers `RegisterReceiver` messages two times
     1) When the actor at `ReceiverSupervisorImpl`'s preStart is invoked
     2) After the receiver is started at the executor `onReceiverStart()` at `ReceiverSupervisorImpl`

Though, RegisterReceiver message uses the same streamId and the receiverInfo gets updated everytime
the message is processed at the `ReceiverTracker`, it makes sense to call register receiver only after the
receiver is started.

Author: Ilayaperumal Gopinathan <ig...@pivotal.io>

Closes #3648 from ilayaperumalg/RTActor-remove-prestart and squashes the following commits:

868efab [Ilayaperumal Gopinathan] Increase receiverInfo collector timeout to 2 secs
3118e5e [Ilayaperumal Gopinathan] Fix StreamingListenerSuite's startedReceiverStreamIds size
634abde [Ilayaperumal Gopinathan] Remove duplicate RegisterReceiver message


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4afad9c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4afad9c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4afad9c7

Branch: refs/heads/master
Commit: 4afad9c7702239f6d5b1b49dc48ee08580964e17
Parents: debc031
Author: Ilayaperumal Gopinathan <ig...@pivotal.io>
Authored: Tue Jan 20 01:41:10 2015 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Jan 20 01:41:10 2015 -0800

----------------------------------------------------------------------
 .../spark/streaming/receiver/ReceiverSupervisorImpl.scala     | 7 -------
 .../org/apache/spark/streaming/StreamingListenerSuite.scala   | 4 ++--
 2 files changed, 2 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4afad9c7/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 3b1233e..d7229c2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -77,13 +77,6 @@ private[streaming] class ReceiverSupervisorImpl(
   /** Akka actor for receiving messages from the ReceiverTracker in the driver */
   private val actor = env.actorSystem.actorOf(
     Props(new Actor {
-      override def preStart() {
-        logInfo("Registered receiver " + streamId)
-        val msg = RegisterReceiver(
-          streamId, receiver.getClass.getSimpleName, Utils.localHostName(), self)
-        val future = trackerActor.ask(msg)(askTimeout)
-        Await.result(future, askTimeout)
-      }
 
       override def receive() = {
         case StopReceiver =>

http://git-wip-us.apache.org/repos/asf/spark/blob/4afad9c7/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 84fed95..f52562b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -73,8 +73,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
 
     ssc.start()
     try {
-      eventually(timeout(1000 millis), interval(20 millis)) {
-        collector.startedReceiverStreamIds.size should be >= 1
+      eventually(timeout(2000 millis), interval(20 millis)) {
+        collector.startedReceiverStreamIds.size should equal (1)
         collector.startedReceiverStreamIds(0) should equal (0)
         collector.stoppedReceiverStreamIds should have size 1
         collector.stoppedReceiverStreamIds(0) should equal (0)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org