You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2019/07/09 02:52:32 UTC

[spark] branch master updated: [SPARK-26713][CORE][FOLLOWUP] revert the partial fix in ShuffleBlockFetcherIterator

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d41bd7c  [SPARK-26713][CORE][FOLLOWUP] revert the partial fix in ShuffleBlockFetcherIterator
d41bd7c is described below

commit d41bd7c8910a960dec5b8605f1cb5d607a4ce958
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Jul 9 11:52:12 2019 +0900

    [SPARK-26713][CORE][FOLLOWUP] revert the partial fix in ShuffleBlockFetcherIterator
    
    ## What changes were proposed in this pull request?
    
    This PR reverts the partial bug fix in `ShuffleBlockFetcherIterator` which was introduced by https://github.com/apache/spark/pull/23638 .
    The reasons:
    1. It's a potential bug. After fixing `PipelinedRDD` in #23638 , the original problem was resolved.
    2. The fix is incomplete according to [the discussion](https://github.com/apache/spark/pull/23638#discussion_r251869084)
    
    We should fix the potential bug completely later.
    
    ## How was this patch tested?
    
    existing tests
    
    Closes #25049 from cloud-fan/revert.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../storage/ShuffleBlockFetcherIterator.scala      | 16 ++----
 .../storage/ShuffleBlockFetcherIteratorSuite.scala | 58 ----------------------
 2 files changed, 3 insertions(+), 71 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index a4d91a7..a283757 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -142,14 +142,7 @@ final class ShuffleBlockFetcherIterator(
 
   /**
    * Whether the iterator is still active. If isZombie is true, the callback interface will no
-   * longer place fetched blocks into [[results]] and the iterator is marked as fully consumed.
-   *
-   * When the iterator is inactive, [[hasNext]] and [[next]] calls will honor that as there are
-   * cases the iterator is still being consumed. For example, ShuffledRDD + PipedRDD if the
-   * subprocess command is failed. The task will be marked as failed, then the iterator will be
-   * cleaned up at task completion, the [[next]] call (called in the stdin writer thread of
-   * PipedRDD if not exited yet) may hang at [[results.take]]. The defensive check in [[hasNext]]
-   * and [[next]] reduces the possibility of such race conditions.
+   * longer place fetched blocks into [[results]].
    */
   @GuardedBy("this")
   private[this] var isZombie = false
@@ -388,7 +381,7 @@ final class ShuffleBlockFetcherIterator(
     logDebug(s"Got local blocks in ${Utils.getUsedTimeNs(startTimeNs)}")
   }
 
-  override def hasNext: Boolean = !isZombie && (numBlocksProcessed < numBlocksToFetch)
+  override def hasNext: Boolean = numBlocksProcessed < numBlocksToFetch
 
   /**
    * Fetches the next (BlockId, InputStream). If a task fails, the ManagedBuffers
@@ -412,7 +405,7 @@ final class ShuffleBlockFetcherIterator(
     // then fetch it one more time if it's corrupt, throw FailureFetchResult if the second fetch
     // is also corrupt, so the previous stage could be retried.
     // For local shuffle block, throw FailureFetchResult for the first IOException.
-    while (!isZombie && result == null) {
+    while (result == null) {
       val startFetchWait = System.nanoTime()
       result = results.take()
       val fetchWaitTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait)
@@ -505,9 +498,6 @@ final class ShuffleBlockFetcherIterator(
       fetchUpToMaxBytes()
     }
 
-    if (result == null) { // the iterator is already closed/cleaned up.
-      throw new NoSuchElementException()
-    }
     currentResult = result.asInstanceOf[SuccessFetchResult]
     (currentResult.blockId,
       new BufferReleasingInputStream(
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 3ab2f0b..ed40244 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -215,64 +215,6 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(blocks(ShuffleBlockId(0, 2, 0)), times(0)).release()
   }
 
-  test("iterator is all consumed if task completes early") {
-    val blockManager = mock(classOf[BlockManager])
-    val localBmId = BlockManagerId("test-client", "test-client", 1)
-    doReturn(localBmId).when(blockManager).blockManagerId
-
-    // Make sure remote blocks would return
-    val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
-    val blocks = Map[BlockId, ManagedBuffer](
-      ShuffleBlockId(0, 0, 0) -> createMockManagedBuffer(),
-      ShuffleBlockId(0, 1, 0) -> createMockManagedBuffer(),
-      ShuffleBlockId(0, 2, 0) -> createMockManagedBuffer())
-
-    // Semaphore to coordinate event sequence in two different threads.
-    val sem = new Semaphore(0)
-
-    val transfer = mock(classOf[BlockTransferService])
-    when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
-      .thenAnswer((invocation: InvocationOnMock) => {
-        val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
-        Future {
-          // Return the first two blocks, and wait till task completion before returning the last
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 1, 0).toString, blocks(ShuffleBlockId(0, 1, 0)))
-          sem.acquire()
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 2, 0).toString, blocks(ShuffleBlockId(0, 2, 0)))
-        }
-      })
-
-    val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
-      (remoteBmId, blocks.keys.map(blockId => (blockId, 1.asInstanceOf[Long])).toSeq)).toIterator
-
-    val taskContext = TaskContext.empty()
-    val iterator = new ShuffleBlockFetcherIterator(
-      taskContext,
-      transfer,
-      blockManager,
-      blocksByAddress,
-      (_, in) => in,
-      48 * 1024 * 1024,
-      Int.MaxValue,
-      Int.MaxValue,
-      Int.MaxValue,
-      true,
-      false,
-      taskContext.taskMetrics.createTempShuffleReadMetrics())
-
-
-    assert(iterator.hasNext)
-    iterator.next()
-
-    taskContext.markTaskCompleted(None)
-    sem.release()
-    assert(iterator.hasNext === false)
-  }
-
   test("fail all blocks if any of the remote request fails") {
     val blockManager = mock(classOf[BlockManager])
     val localBmId = BlockManagerId("test-client", "test-client", 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org