You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/10/28 07:31:53 UTC

git commit: [SPARK-4064]NioBlockTransferService.fetchBlocks may cause spark to hang.

Repository: spark
Updated Branches:
  refs/heads/master 0c34fa5b4 -> 7c0c26cd1


[SPARK-4064]NioBlockTransferService.fetchBlocks may cause spark to hang.

cc @rxin

Author: GuoQiang Li <wi...@qq.com>

Closes #2929 from witgo/SPARK-4064 and squashes the following commits:

20110f2 [GuoQiang Li] Modify the exception msg
3425225 [GuoQiang Li] review commits
2b07e49 [GuoQiang Li] If we create a lot of big broadcast variables, Spark may hang


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c0c26cd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c0c26cd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c0c26cd

Branch: refs/heads/master
Commit: 7c0c26cd1241e1fde3c6f1f659a43b9c40ee3d42
Parents: 0c34fa5
Author: GuoQiang Li <wi...@qq.com>
Authored: Mon Oct 27 23:31:46 2014 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Oct 27 23:31:46 2014 -0700

----------------------------------------------------------------------
 .../network/nio/NioBlockTransferService.scala   | 25 ++++++++++++--------
 1 file changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7c0c26cd/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
index 5add4fc..e311320 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala
@@ -95,16 +95,21 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
     future.onSuccess { case message =>
       val bufferMessage = message.asInstanceOf[BufferMessage]
       val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
-
-      for (blockMessage <- blockMessageArray) {
-        if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) {
-          listener.onBlockFetchFailure(
-            new SparkException(s"Unexpected message ${blockMessage.getType} received from $cmId"))
-        } else {
-          val blockId = blockMessage.getId
-          val networkSize = blockMessage.getData.limit()
-          listener.onBlockFetchSuccess(
-            blockId.toString, new NioByteBufferManagedBuffer(blockMessage.getData))
+      // SPARK-4064: In some cases(eg. Remote block was removed) blockMessageArray may be empty.
+      if (blockMessageArray.isEmpty) {
+        listener.onBlockFetchFailure(
+          new SparkException(s"Received empty message from $cmId"))
+      } else {
+        for (blockMessage <- blockMessageArray) {
+          val msgType = blockMessage.getType
+          if (msgType != BlockMessage.TYPE_GOT_BLOCK) {
+            listener.onBlockFetchFailure(
+              new SparkException(s"Unexpected message ${msgType} received from $cmId"))
+          } else {
+            val blockId = blockMessage.getId
+            listener.onBlockFetchSuccess(
+              blockId.toString, new NioByteBufferManagedBuffer(blockMessage.getData))
+          }
         }
       }
     }(cm.futureExecContext)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org