You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/03/04 12:35:28 UTC

[spark] branch branch-3.0 updated: [SPARK-31017][TEST][CORE] Test for shuffle requests packaging with different size and numBlocks limit

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 1a50ee6  [SPARK-31017][TEST][CORE] Test for shuffle requests packaging with different size and numBlocks limit
1a50ee6 is described below

commit 1a50ee6f6467a9fd50160d09d937b7316595af88
Author: yi.wu <yi...@databricks.com>
AuthorDate: Wed Mar 4 20:21:48 2020 +0800

    [SPARK-31017][TEST][CORE] Test for shuffle requests packaging with different size and numBlocks limit
    
    ### What changes were proposed in this pull request?
    
    Added 2 tests for `ShuffleBlockFetcherIteratorSuite`.
    
    ### Why are the changes needed?
    
    When packaging shuffle fetch requests in `ShuffleBlockFetcherIterator`, there are two limitations: `maxBytesInFlight` and `maxBlocksInFlightPerAddress`. However, we don’t have test cases to test them both, e.g. the size limitation is hit before the numBlocks limitation.
    
    We should add test cases in `ShuffleBlockFetcherIteratorSuite` to test:
    
    1. the size limitation is hit before the numBlocks limitation
    2. the numBlocks limitation is hit before the size limitation
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added new tests.
    
    Closes #27767 from Ngone51/add_test.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 87b93d32a6bfb0f2127019b97b3fc1d13e16a10b)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../storage/ShuffleBlockFetcherIteratorSuite.scala | 92 ++++++++++++++++++++++
 1 file changed, 92 insertions(+)

diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 45f47c7..2090a51 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -254,6 +254,98 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     intercept[FetchFailedException] { iterator.next() }
   }
 
+  test("Hit maxBytesInFlight limitation before maxBlocksInFlightPerAddress") {
+    val blockManager = mock(classOf[BlockManager])
+    val localBmId = BlockManagerId("test-client", "test-local-host", 1)
+    doReturn(localBmId).when(blockManager).blockManagerId
+
+    val remoteBmId1 = BlockManagerId("test-remote-client-1", "test-remote-host1", 1)
+    val remoteBmId2 = BlockManagerId("test-remote-client-2", "test-remote-host2", 2)
+    val blockId1 = ShuffleBlockId(0, 1, 0)
+    val blockId2 = ShuffleBlockId(1, 1, 0)
+    val blocksByAddress = Seq(
+      (remoteBmId1, Seq((blockId1, 1000L, 0))),
+      (remoteBmId2, Seq((blockId2, 1000L, 0)))).toIterator
+    val transfer = createMockTransfer(Map(
+      blockId1 -> createMockManagedBuffer(1000),
+      blockId2 -> createMockManagedBuffer(1000)))
+    val taskContext = TaskContext.empty()
+    val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics()
+    val iterator = new ShuffleBlockFetcherIterator(
+      taskContext,
+      transfer,
+      blockManager,
+      blocksByAddress,
+      (_, in) => in,
+      1000L, // allow 1 FetchRequests at most at the same time
+      Int.MaxValue,
+      Int.MaxValue, // set maxBlocksInFlightPerAddress to Int.MaxValue
+      Int.MaxValue,
+      true,
+      false,
+      metrics,
+      false)
+    // After initialize() we'll have 2 FetchRequests and each is 1000 bytes. So only the
+    // first FetchRequests can be sent, and the second one will hit maxBytesInFlight so
+    // it won't be sent.
+    verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any(), any(), any())
+    assert(iterator.hasNext)
+    // next() will trigger off sending deferred request
+    iterator.next()
+    // the second FetchRequest should be sent at this time
+    verify(transfer, times(2)).fetchBlocks(any(), any(), any(), any(), any(), any())
+    assert(iterator.hasNext)
+    iterator.next()
+    assert(!iterator.hasNext)
+  }
+
+  test("Hit maxBlocksInFlightPerAddress limitation before maxBytesInFlight") {
+    val blockManager = mock(classOf[BlockManager])
+    val localBmId = BlockManagerId("test-client", "test-local-host", 1)
+    doReturn(localBmId).when(blockManager).blockManagerId
+
+    val remoteBmId = BlockManagerId("test-remote-client-1", "test-remote-host", 2)
+    val blockId1 = ShuffleBlockId(0, 1, 0)
+    val blockId2 = ShuffleBlockId(0, 2, 0)
+    val blockId3 = ShuffleBlockId(0, 3, 0)
+    val blocksByAddress = Seq((remoteBmId,
+      Seq((blockId1, 1000L, 0), (blockId2, 1000L, 0), (blockId3, 1000L, 0)))).toIterator
+    val transfer = createMockTransfer(Map(
+      blockId1 -> createMockManagedBuffer(),
+      blockId2 -> createMockManagedBuffer(),
+      blockId3 -> createMockManagedBuffer()))
+    val taskContext = TaskContext.empty()
+    val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics()
+    val iterator = new ShuffleBlockFetcherIterator(
+      taskContext,
+      transfer,
+      blockManager,
+      blocksByAddress,
+      (_, in) => in,
+      Int.MaxValue, // set maxBytesInFlight to Int.MaxValue
+      Int.MaxValue,
+      2, // set maxBlocksInFlightPerAddress to 2
+      Int.MaxValue,
+      true,
+      false,
+      metrics,
+      false)
+    // After initialize(), we'll have 2 FetchRequests that one has 2 blocks inside and another one
+    // has only one block. So only the first FetchRequest can be sent. The second FetchRequest will
+    // hit maxBlocksInFlightPerAddress so it won't be sent.
+    verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any(), any(), any())
+    // the first request packaged 2 blocks, so we also need to
+    // call next() for 2 times to exhaust the iterator.
+    assert(iterator.hasNext)
+    iterator.next()
+    assert(iterator.hasNext)
+    iterator.next()
+    verify(transfer, times(2)).fetchBlocks(any(), any(), any(), any(), any(), any())
+    assert(iterator.hasNext)
+    iterator.next()
+    assert(!iterator.hasNext)
+  }
+
   test("fetch continuous blocks in batch successful 3 local + 4 host local + 2 remote reads") {
     val blockManager = mock(classOf[BlockManager])
     val localBmId = BlockManagerId("test-client", "test-local-host", 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org