You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/01/31 19:48:25 UTC

spark git commit: [SPARK-21525][STREAMING] Check error code from supervisor RPC.

Repository: spark
Updated Branches:
  refs/heads/master 8c21170de -> dd242bad3


[SPARK-21525][STREAMING] Check error code from supervisor RPC.

The code was ignoring the error code from the AddBlock RPC, which
means that a failure to write to the WAL was being ignored by the
receiver, and would lead to the block being acked (in the case of
the Flume receiver) and data potentially lost.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #20161 from vanzin/SPARK-21525.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd242bad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd242bad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd242bad

Branch: refs/heads/master
Commit: dd242bad39cc6df7ff6c6b16642bdc92dccca6ac
Parents: 8c21170
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Wed Jan 31 11:48:19 2018 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Wed Jan 31 11:48:19 2018 -0800

----------------------------------------------------------------------
 .../apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala | 4 +++-
 .../org/apache/spark/streaming/scheduler/ReceiverTracker.scala   | 3 ++-
 2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dd242bad/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 27644a6..5d38c56 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -159,7 +159,9 @@ private[streaming] class ReceiverSupervisorImpl(
     logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
     val numRecords = blockStoreResult.numRecords
     val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
-    trackerEndpoint.askSync[Boolean](AddBlock(blockInfo))
+    if (!trackerEndpoint.askSync[Boolean](AddBlock(blockInfo))) {
+      throw new SparkException("Failed to add block to receiver tracker.")
+    }
     logDebug(s"Reported block $blockId")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dd242bad/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 6f130c8..c74ca19 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -521,7 +521,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
               if (active) {
                 context.reply(addBlock(receivedBlockInfo))
               } else {
-                throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
+                context.sendFailure(
+                  new IllegalStateException("ReceiverTracker RpcEndpoint already shut down."))
               }
             }
           })


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org