You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2015/05/29 07:46:04 UTC
spark git commit: [SPARK-7931] [STREAMING] Do not restart receiver
when stopped
Repository: spark
Updated Branches:
refs/heads/master db9513789 -> e714ecf27
[SPARK-7931] [STREAMING] Do not restart receiver when stopped
Attempts to restart the socket receiver when it is supposed to be stopped causes undesirable error messages.
Author: Tathagata Das <ta...@gmail.com>
Closes #6483 from tdas/SPARK-7931 and squashes the following commits:
09aeee1 [Tathagata Das] Do not restart receiver when stopped
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e714ecf2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e714ecf2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e714ecf2
Branch: refs/heads/master
Commit: e714ecf277a7412ea8263662977fe3ad1f794975
Parents: db95137
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu May 28 22:39:21 2015 -0700
Committer: Patrick Wendell <pa...@databricks.com>
Committed: Thu May 28 22:39:25 2015 -0700
----------------------------------------------------------------------
.../spark/streaming/dstream/SocketInputDStream.scala | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e714ecf2/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index 8b72bcf..96e0a9c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -17,6 +17,8 @@
package org.apache.spark.streaming.dstream
+import scala.util.control.NonFatal
+
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.NextIterator
@@ -74,13 +76,16 @@ class SocketReceiver[T: ClassTag](
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
+ if (!isStopped()) {
+ restart("Socket data stream had no more data")
+ }
logInfo("Stopped receiving")
- restart("Retrying connecting to " + host + ":" + port)
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
- case t: Throwable =>
- restart("Error receiving data", t)
+ case NonFatal(e) =>
+ logWarning("Error receiving data", e)
+ restart("Error receiving data", e)
} finally {
if (socket != null) {
socket.close()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org