You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/01/20 10:41:22 UTC
spark git commit: [SPARK-4803] [streaming] Remove duplicate
RegisterReceiver message
Repository: spark
Updated Branches:
refs/heads/master debc03195 -> 4afad9c77
[SPARK-4803] [streaming] Remove duplicate RegisterReceiver message
- The ReceiverTracker receivers `RegisterReceiver` messages two times
1) When the actor at `ReceiverSupervisorImpl`'s preStart is invoked
2) After the receiver is started at the executor `onReceiverStart()` at `ReceiverSupervisorImpl`
Though, RegisterReceiver message uses the same streamId and the receiverInfo gets updated everytime
the message is processed at the `ReceiverTracker`, it makes sense to call register receiver only after the
receiver is started.
Author: Ilayaperumal Gopinathan <ig...@pivotal.io>
Closes #3648 from ilayaperumalg/RTActor-remove-prestart and squashes the following commits:
868efab [Ilayaperumal Gopinathan] Increase receiverInfo collector timeout to 2 secs
3118e5e [Ilayaperumal Gopinathan] Fix StreamingListenerSuite's startedReceiverStreamIds size
634abde [Ilayaperumal Gopinathan] Remove duplicate RegisterReceiver message
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4afad9c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4afad9c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4afad9c7
Branch: refs/heads/master
Commit: 4afad9c7702239f6d5b1b49dc48ee08580964e17
Parents: debc031
Author: Ilayaperumal Gopinathan <ig...@pivotal.io>
Authored: Tue Jan 20 01:41:10 2015 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Jan 20 01:41:10 2015 -0800
----------------------------------------------------------------------
.../spark/streaming/receiver/ReceiverSupervisorImpl.scala | 7 -------
.../org/apache/spark/streaming/StreamingListenerSuite.scala | 4 ++--
2 files changed, 2 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/4afad9c7/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 3b1233e..d7229c2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -77,13 +77,6 @@ private[streaming] class ReceiverSupervisorImpl(
/** Akka actor for receiving messages from the ReceiverTracker in the driver */
private val actor = env.actorSystem.actorOf(
Props(new Actor {
- override def preStart() {
- logInfo("Registered receiver " + streamId)
- val msg = RegisterReceiver(
- streamId, receiver.getClass.getSimpleName, Utils.localHostName(), self)
- val future = trackerActor.ask(msg)(askTimeout)
- Await.result(future, askTimeout)
- }
override def receive() = {
case StopReceiver =>
http://git-wip-us.apache.org/repos/asf/spark/blob/4afad9c7/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 84fed95..f52562b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -73,8 +73,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
ssc.start()
try {
- eventually(timeout(1000 millis), interval(20 millis)) {
- collector.startedReceiverStreamIds.size should be >= 1
+ eventually(timeout(2000 millis), interval(20 millis)) {
+ collector.startedReceiverStreamIds.size should equal (1)
collector.startedReceiverStreamIds(0) should equal (0)
collector.stoppedReceiverStreamIds should have size 1
collector.stoppedReceiverStreamIds(0) should equal (0)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org