You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2014/07/25 00:59:52 UTC

git commit: [SPARK-2464][Streaming] Fixed Twitter stream stopping bug

Repository: spark
Updated Branches:
  refs/heads/master fec641b84 -> a45d5480f


[SPARK-2464][Streaming] Fixed Twitter stream stopping bug

Stopping the Twitter Receiver would call twitter4j's TwitterStream.shutdown, which in turn causes an Exception to be thrown to the listener. This exception caused the Receiver to be restarted. This patch check whether the receiver was stopped or not, and accordingly restarts on exception.

Author: Tathagata Das <ta...@gmail.com>

Closes #1577 from tdas/twitter-stop and squashes the following commits:

011b525 [Tathagata Das] Fixed Twitter stream stopping bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a45d5480
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a45d5480
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a45d5480

Branch: refs/heads/master
Commit: a45d5480f65d2e969fc7fbd8f358b1717fb99bef
Parents: fec641b
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Jul 24 15:59:09 2014 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Jul 24 15:59:09 2014 -0700

----------------------------------------------------------------------
 .../spark/streaming/twitter/TwitterInputDStream.scala       | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a45d5480/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
index 5ea2e55..4eacc47 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala
@@ -63,7 +63,8 @@ class TwitterReceiver(
     storageLevel: StorageLevel
   ) extends Receiver[Status](storageLevel) with Logging {
 
-  private var twitterStream: TwitterStream = _
+  @volatile private var twitterStream: TwitterStream = _
+  @volatile private var stopped = false
 
   def onStart() {
     try {
@@ -78,7 +79,9 @@ class TwitterReceiver(
         def onScrubGeo(l: Long, l1: Long) {}
         def onStallWarning(stallWarning: StallWarning) {}
         def onException(e: Exception) {
-          restart("Error receiving tweets", e)
+          if (!stopped) {
+            restart("Error receiving tweets", e)
+          }
         }
       })
 
@@ -91,12 +94,14 @@ class TwitterReceiver(
       }
       setTwitterStream(newTwitterStream)
       logInfo("Twitter receiver started")
+      stopped = false
     } catch {
       case e: Exception => restart("Error starting Twitter stream", e)
     }
   }
 
   def onStop() {
+    stopped = true
     setTwitterStream(null)
     logInfo("Twitter receiver stopped")
   }