You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/07/09 02:52:32 UTC
[spark] branch master updated: [SPARK-26713][CORE][FOLLOWUP] revert
the partial fix in ShuffleBlockFetcherIterator
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d41bd7c [SPARK-26713][CORE][FOLLOWUP] revert the partial fix in ShuffleBlockFetcherIterator
d41bd7c is described below
commit d41bd7c8910a960dec5b8605f1cb5d607a4ce958
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Jul 9 11:52:12 2019 +0900
[SPARK-26713][CORE][FOLLOWUP] revert the partial fix in ShuffleBlockFetcherIterator
## What changes were proposed in this pull request?
This PR reverts the partial bug fix in `ShuffleBlockFetcherIterator` which was introduced by https://github.com/apache/spark/pull/23638 .
The reasons:
1. It's a potential bug. After fixing `PipelinedRDD` in #23638 , the original problem was resolved.
2. The fix is incomplete according to [the discussion](https://github.com/apache/spark/pull/23638#discussion_r251869084)
We should fix the potential bug completely later.
## How was this patch tested?
existing tests
Closes #25049 from cloud-fan/revert.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../storage/ShuffleBlockFetcherIterator.scala | 16 ++----
.../storage/ShuffleBlockFetcherIteratorSuite.scala | 58 ----------------------
2 files changed, 3 insertions(+), 71 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index a4d91a7..a283757 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -142,14 +142,7 @@ final class ShuffleBlockFetcherIterator(
/**
* Whether the iterator is still active. If isZombie is true, the callback interface will no
- * longer place fetched blocks into [[results]] and the iterator is marked as fully consumed.
- *
- * When the iterator is inactive, [[hasNext]] and [[next]] calls will honor that as there are
- * cases the iterator is still being consumed. For example, ShuffledRDD + PipedRDD if the
- * subprocess command is failed. The task will be marked as failed, then the iterator will be
- * cleaned up at task completion, the [[next]] call (called in the stdin writer thread of
- * PipedRDD if not exited yet) may hang at [[results.take]]. The defensive check in [[hasNext]]
- * and [[next]] reduces the possibility of such race conditions.
+ * longer place fetched blocks into [[results]].
*/
@GuardedBy("this")
private[this] var isZombie = false
@@ -388,7 +381,7 @@ final class ShuffleBlockFetcherIterator(
logDebug(s"Got local blocks in ${Utils.getUsedTimeNs(startTimeNs)}")
}
- override def hasNext: Boolean = !isZombie && (numBlocksProcessed < numBlocksToFetch)
+ override def hasNext: Boolean = numBlocksProcessed < numBlocksToFetch
/**
* Fetches the next (BlockId, InputStream). If a task fails, the ManagedBuffers
@@ -412,7 +405,7 @@ final class ShuffleBlockFetcherIterator(
// then fetch it one more time if it's corrupt, throw FailureFetchResult if the second fetch
// is also corrupt, so the previous stage could be retried.
// For local shuffle block, throw FailureFetchResult for the first IOException.
- while (!isZombie && result == null) {
+ while (result == null) {
val startFetchWait = System.nanoTime()
result = results.take()
val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait)
@@ -505,9 +498,6 @@ final class ShuffleBlockFetcherIterator(
fetchUpToMaxBytes()
}
- if (result == null) { // the iterator is already closed/cleaned up.
- throw new NoSuchElementException()
- }
currentResult = result.asInstanceOf[SuccessFetchResult]
(currentResult.blockId,
new BufferReleasingInputStream(
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 3ab2f0b..ed40244 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -215,64 +215,6 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
verify(blocks(ShuffleBlockId(0, 2, 0)), times(0)).release()
}
- test("iterator is all consumed if task completes early") {
- val blockManager = mock(classOf[BlockManager])
- val localBmId = BlockManagerId("test-client", "test-client", 1)
- doReturn(localBmId).when(blockManager).blockManagerId
-
- // Make sure remote blocks would return
- val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
- val blocks = Map[BlockId, ManagedBuffer](
- ShuffleBlockId(0, 0, 0) -> createMockManagedBuffer(),
- ShuffleBlockId(0, 1, 0) -> createMockManagedBuffer(),
- ShuffleBlockId(0, 2, 0) -> createMockManagedBuffer())
-
- // Semaphore to coordinate event sequence in two different threads.
- val sem = new Semaphore(0)
-
- val transfer = mock(classOf[BlockTransferService])
- when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
- .thenAnswer((invocation: InvocationOnMock) => {
- val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
- Future {
- // Return the first two blocks, and wait till task completion before returning the last
- listener.onBlockFetchSuccess(
- ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
- listener.onBlockFetchSuccess(
- ShuffleBlockId(0, 1, 0).toString, blocks(ShuffleBlockId(0, 1, 0)))
- sem.acquire()
- listener.onBlockFetchSuccess(
- ShuffleBlockId(0, 2, 0).toString, blocks(ShuffleBlockId(0, 2, 0)))
- }
- })
-
- val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
- (remoteBmId, blocks.keys.map(blockId => (blockId, 1.asInstanceOf[Long])).toSeq)).toIterator
-
- val taskContext = TaskContext.empty()
- val iterator = new ShuffleBlockFetcherIterator(
- taskContext,
- transfer,
- blockManager,
- blocksByAddress,
- (_, in) => in,
- 48 * 1024 * 1024,
- Int.MaxValue,
- Int.MaxValue,
- Int.MaxValue,
- true,
- false,
- taskContext.taskMetrics.createTempShuffleReadMetrics())
-
-
- assert(iterator.hasNext)
- iterator.next()
-
- taskContext.markTaskCompleted(None)
- sem.release()
- assert(iterator.hasNext === false)
- }
-
test("fail all blocks if any of the remote request fails") {
val blockManager = mock(classOf[BlockManager])
val localBmId = BlockManagerId("test-client", "test-client", 1)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org