You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2019/01/22 17:35:12 UTC
[spark] branch branch-2.3 updated: [SPARK-26665][CORE] Fix a bug
that BlockTransferService.fetchBlockSync may hang forever
This is an automated email from the ASF dual-hosted git repository.
zsxwing pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new b88067b [SPARK-26665][CORE] Fix a bug that BlockTransferService.fetchBlockSync may hang forever
b88067b is described below
commit b88067bd0f7b9466a89ce6458cb7766a24283b13
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Tue Jan 22 09:00:52 2019 -0800
[SPARK-26665][CORE] Fix a bug that BlockTransferService.fetchBlockSync may hang forever
## What changes were proposed in this pull request?
`ByteBuffer.allocate` may throw `OutOfMemoryError` when the block is large but no enough memory is available. However, when this happens, right now BlockTransferService.fetchBlockSync will just hang forever as its `BlockFetchingListener. onBlockFetchSuccess` doesn't complete `Promise`.
This PR catches `Throwable` and uses the error to complete `Promise`.
## How was this patch tested?
Added a unit test. Since I cannot make `ByteBuffer.allocate` throw `OutOfMemoryError`, I passed a negative size to make `ByteBuffer.allocate` fail. Although the error type is different, it should trigger the same code path.
Closes #23590 from zsxwing/SPARK-26665.
Authored-by: Shixiong Zhu <zs...@gmail.com>
Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
.../spark/network/BlockTransferService.scala | 12 ++-
.../spark/network/BlockTransferServiceSuite.scala | 104 +++++++++++++++++++++
2 files changed, 112 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index eef8c31..875e4fc 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -105,10 +105,14 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
case f: FileSegmentManagedBuffer =>
result.success(f)
case _ =>
- val ret = ByteBuffer.allocate(data.size.toInt)
- ret.put(data.nioByteBuffer())
- ret.flip()
- result.success(new NioManagedBuffer(ret))
+ try {
+ val ret = ByteBuffer.allocate(data.size.toInt)
+ ret.put(data.nioByteBuffer())
+ ret.flip()
+ result.success(new NioManagedBuffer(ret))
+ } catch {
+ case e: Throwable => result.failure(e)
+ }
}
}
}, tempFileManager)
diff --git a/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala b/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala
new file mode 100644
index 0000000..d7e4b91
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network
+
+import java.io.InputStream
+import java.nio.ByteBuffer
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.reflect.ClassTag
+
+import org.scalatest.concurrent._
+
+import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager}
+import org.apache.spark.storage.{BlockId, StorageLevel}
+
+class BlockTransferServiceSuite extends SparkFunSuite with TimeLimits {
+
+ implicit val defaultSignaler: Signaler = ThreadSignaler
+
+ test("fetchBlockSync should not hang when BlockFetchingListener.onBlockFetchSuccess fails") {
+ // Create a mocked `BlockTransferService` to call `BlockFetchingListener.onBlockFetchSuccess`
+ // with a bad `ManagedBuffer` which will trigger an exception in `onBlockFetchSuccess`.
+ val blockTransferService = new BlockTransferService {
+ override def init(blockDataManager: BlockDataManager): Unit = {}
+
+ override def close(): Unit = {}
+
+ override def port: Int = 0
+
+ override def hostName: String = "localhost-unused"
+
+ override def fetchBlocks(
+ host: String,
+ port: Int,
+ execId: String,
+ blockIds: Array[String],
+ listener: BlockFetchingListener,
+ tempFileManager: DownloadFileManager): Unit = {
+ // Notify BlockFetchingListener with a bad ManagedBuffer asynchronously
+ new Thread() {
+ override def run(): Unit = {
+ // This is a bad buffer to trigger `IllegalArgumentException` in
+ // `BlockFetchingListener.onBlockFetchSuccess`. The real issue we hit is
+ // `ByteBuffer.allocate` throws `OutOfMemoryError`, but we cannot make it happen in
+ // a test. Instead, we use a negative size value to make `ByteBuffer.allocate` fail,
+ // and this should trigger the same code path as `OutOfMemoryError`.
+ val badBuffer = new ManagedBuffer {
+ override def size(): Long = -1
+
+ override def nioByteBuffer(): ByteBuffer = null
+
+ override def createInputStream(): InputStream = null
+
+ override def retain(): ManagedBuffer = this
+
+ override def release(): ManagedBuffer = this
+
+ override def convertToNetty(): AnyRef = null
+ }
+ listener.onBlockFetchSuccess("block-id-unused", badBuffer)
+ }
+ }.start()
+ }
+
+ override def uploadBlock(
+ hostname: String,
+ port: Int,
+ execId: String,
+ blockId: BlockId,
+ blockData: ManagedBuffer,
+ level: StorageLevel,
+ classTag: ClassTag[_]): Future[Unit] = {
+ // This method is unused in this test
+ throw new UnsupportedOperationException("uploadBlock")
+ }
+ }
+
+ val e = intercept[SparkException] {
+ failAfter(10.seconds) {
+ blockTransferService.fetchBlockSync(
+ "localhost-unused", 0, "exec-id-unused", "block-id-unused", null)
+ }
+ }
+ assert(e.getCause.isInstanceOf[IllegalArgumentException])
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org