You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2019/01/22 17:35:12 UTC

[spark] branch branch-2.3 updated: [SPARK-26665][CORE] Fix a bug that BlockTransferService.fetchBlockSync may hang forever

This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new b88067b  [SPARK-26665][CORE] Fix a bug that BlockTransferService.fetchBlockSync may hang forever
b88067b is described below

commit b88067bd0f7b9466a89ce6458cb7766a24283b13
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Tue Jan 22 09:00:52 2019 -0800

    [SPARK-26665][CORE] Fix a bug that BlockTransferService.fetchBlockSync may hang forever
    
    ## What changes were proposed in this pull request?
    
    `ByteBuffer.allocate` may throw `OutOfMemoryError` when the block is large but no enough memory is available. However, when this happens, right now BlockTransferService.fetchBlockSync will just hang forever as its `BlockFetchingListener. onBlockFetchSuccess` doesn't complete `Promise`.
    
    This PR catches `Throwable` and uses the error to complete `Promise`.
    
    ## How was this patch tested?
    
    Added a unit test. Since I cannot make `ByteBuffer.allocate` throw `OutOfMemoryError`, I passed a negative size to make `ByteBuffer.allocate` fail. Although the error type is different, it should trigger the same code path.
    
    Closes #23590 from zsxwing/SPARK-26665.
    
    Authored-by: Shixiong Zhu <zs...@gmail.com>
    Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
 .../spark/network/BlockTransferService.scala       |  12 ++-
 .../spark/network/BlockTransferServiceSuite.scala  | 104 +++++++++++++++++++++
 2 files changed, 112 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index eef8c31..875e4fc 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -105,10 +105,14 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
             case f: FileSegmentManagedBuffer =>
               result.success(f)
             case _ =>
-              val ret = ByteBuffer.allocate(data.size.toInt)
-              ret.put(data.nioByteBuffer())
-              ret.flip()
-              result.success(new NioManagedBuffer(ret))
+              try {
+                val ret = ByteBuffer.allocate(data.size.toInt)
+                ret.put(data.nioByteBuffer())
+                ret.flip()
+                result.success(new NioManagedBuffer(ret))
+              } catch {
+                case e: Throwable => result.failure(e)
+              }
           }
         }
       }, tempFileManager)
diff --git a/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala b/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala
new file mode 100644
index 0000000..d7e4b91
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network
+
+import java.io.InputStream
+import java.nio.ByteBuffer
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.reflect.ClassTag
+
+import org.scalatest.concurrent._
+
+import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager}
+import org.apache.spark.storage.{BlockId, StorageLevel}
+
+class BlockTransferServiceSuite extends SparkFunSuite with TimeLimits {
+
+  implicit val defaultSignaler: Signaler = ThreadSignaler
+
+  test("fetchBlockSync should not hang when BlockFetchingListener.onBlockFetchSuccess fails") {
+    // Create a mocked `BlockTransferService` to call `BlockFetchingListener.onBlockFetchSuccess`
+    // with a bad `ManagedBuffer` which will trigger an exception in `onBlockFetchSuccess`.
+    val blockTransferService = new BlockTransferService {
+      override def init(blockDataManager: BlockDataManager): Unit = {}
+
+      override def close(): Unit = {}
+
+      override def port: Int = 0
+
+      override def hostName: String = "localhost-unused"
+
+      override def fetchBlocks(
+          host: String,
+          port: Int,
+          execId: String,
+          blockIds: Array[String],
+          listener: BlockFetchingListener,
+          tempFileManager: DownloadFileManager): Unit = {
+        // Notify BlockFetchingListener with a bad ManagedBuffer asynchronously
+        new Thread() {
+          override def run(): Unit = {
+            // This is a bad buffer to trigger `IllegalArgumentException` in
+            // `BlockFetchingListener.onBlockFetchSuccess`. The real issue we hit is
+            // `ByteBuffer.allocate` throws `OutOfMemoryError`, but we cannot make it happen in
+            // a test. Instead, we use a negative size value to make `ByteBuffer.allocate` fail,
+            // and this should trigger the same code path as `OutOfMemoryError`.
+            val badBuffer = new ManagedBuffer {
+              override def size(): Long = -1
+
+              override def nioByteBuffer(): ByteBuffer = null
+
+              override def createInputStream(): InputStream = null
+
+              override def retain(): ManagedBuffer = this
+
+              override def release(): ManagedBuffer = this
+
+              override def convertToNetty(): AnyRef = null
+            }
+            listener.onBlockFetchSuccess("block-id-unused", badBuffer)
+          }
+        }.start()
+      }
+
+      override def uploadBlock(
+          hostname: String,
+          port: Int,
+          execId: String,
+          blockId: BlockId,
+          blockData: ManagedBuffer,
+          level: StorageLevel,
+          classTag: ClassTag[_]): Future[Unit] = {
+        // This method is unused in this test
+        throw new UnsupportedOperationException("uploadBlock")
+      }
+    }
+
+    val e = intercept[SparkException] {
+      failAfter(10.seconds) {
+        blockTransferService.fetchBlockSync(
+          "localhost-unused", 0, "exec-id-unused", "block-id-unused", null)
+      }
+    }
+    assert(e.getCause.isInstanceOf[IllegalArgumentException])
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org