You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/03/04 12:35:28 UTC
[spark] branch branch-3.0 updated: [SPARK-31017][TEST][CORE] Test
for shuffle requests packaging with different size and numBlocks limit
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 1a50ee6 [SPARK-31017][TEST][CORE] Test for shuffle requests packaging with different size and numBlocks limit
1a50ee6 is described below
commit 1a50ee6f6467a9fd50160d09d937b7316595af88
Author: yi.wu <yi...@databricks.com>
AuthorDate: Wed Mar 4 20:21:48 2020 +0800
[SPARK-31017][TEST][CORE] Test for shuffle requests packaging with different size and numBlocks limit
### What changes were proposed in this pull request?
Added 2 tests for `ShuffleBlockFetcherIteratorSuite`.
### Why are the changes needed?
When packaging shuffle fetch requests in `ShuffleBlockFetcherIterator`, there are two limitations: `maxBytesInFlight` and `maxBlocksInFlightPerAddress`. However, we don’t have test cases to test them both, e.g. the size limitation is hit before the numBlocks limitation.
We should add test cases in `ShuffleBlockFetcherIteratorSuite` to test:
1. the size limitation is hit before the numBlocks limitation
2. the numBlocks limitation is hit before the size limitation
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Added new tests.
Closes #27767 from Ngone51/add_test.
Authored-by: yi.wu <yi...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 87b93d32a6bfb0f2127019b97b3fc1d13e16a10b)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../storage/ShuffleBlockFetcherIteratorSuite.scala | 92 ++++++++++++++++++++++
1 file changed, 92 insertions(+)
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 45f47c7..2090a51 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -254,6 +254,98 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
intercept[FetchFailedException] { iterator.next() }
}
+ test("Hit maxBytesInFlight limitation before maxBlocksInFlightPerAddress") {
+ val blockManager = mock(classOf[BlockManager])
+ val localBmId = BlockManagerId("test-client", "test-local-host", 1)
+ doReturn(localBmId).when(blockManager).blockManagerId
+
+ val remoteBmId1 = BlockManagerId("test-remote-client-1", "test-remote-host1", 1)
+ val remoteBmId2 = BlockManagerId("test-remote-client-2", "test-remote-host2", 2)
+ val blockId1 = ShuffleBlockId(0, 1, 0)
+ val blockId2 = ShuffleBlockId(1, 1, 0)
+ val blocksByAddress = Seq(
+ (remoteBmId1, Seq((blockId1, 1000L, 0))),
+ (remoteBmId2, Seq((blockId2, 1000L, 0)))).toIterator
+ val transfer = createMockTransfer(Map(
+ blockId1 -> createMockManagedBuffer(1000),
+ blockId2 -> createMockManagedBuffer(1000)))
+ val taskContext = TaskContext.empty()
+ val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics()
+ val iterator = new ShuffleBlockFetcherIterator(
+ taskContext,
+ transfer,
+ blockManager,
+ blocksByAddress,
+ (_, in) => in,
+ 1000L, // allow 1 FetchRequests at most at the same time
+ Int.MaxValue,
+ Int.MaxValue, // set maxBlocksInFlightPerAddress to Int.MaxValue
+ Int.MaxValue,
+ true,
+ false,
+ metrics,
+ false)
+ // After initialize() we'll have 2 FetchRequests and each is 1000 bytes. So only the
+ // first FetchRequests can be sent, and the second one will hit maxBytesInFlight so
+ // it won't be sent.
+ verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any(), any(), any())
+ assert(iterator.hasNext)
+ // next() will trigger off sending deferred request
+ iterator.next()
+ // the second FetchRequest should be sent at this time
+ verify(transfer, times(2)).fetchBlocks(any(), any(), any(), any(), any(), any())
+ assert(iterator.hasNext)
+ iterator.next()
+ assert(!iterator.hasNext)
+ }
+
+ test("Hit maxBlocksInFlightPerAddress limitation before maxBytesInFlight") {
+ val blockManager = mock(classOf[BlockManager])
+ val localBmId = BlockManagerId("test-client", "test-local-host", 1)
+ doReturn(localBmId).when(blockManager).blockManagerId
+
+ val remoteBmId = BlockManagerId("test-remote-client-1", "test-remote-host", 2)
+ val blockId1 = ShuffleBlockId(0, 1, 0)
+ val blockId2 = ShuffleBlockId(0, 2, 0)
+ val blockId3 = ShuffleBlockId(0, 3, 0)
+ val blocksByAddress = Seq((remoteBmId,
+ Seq((blockId1, 1000L, 0), (blockId2, 1000L, 0), (blockId3, 1000L, 0)))).toIterator
+ val transfer = createMockTransfer(Map(
+ blockId1 -> createMockManagedBuffer(),
+ blockId2 -> createMockManagedBuffer(),
+ blockId3 -> createMockManagedBuffer()))
+ val taskContext = TaskContext.empty()
+ val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics()
+ val iterator = new ShuffleBlockFetcherIterator(
+ taskContext,
+ transfer,
+ blockManager,
+ blocksByAddress,
+ (_, in) => in,
+ Int.MaxValue, // set maxBytesInFlight to Int.MaxValue
+ Int.MaxValue,
+ 2, // set maxBlocksInFlightPerAddress to 2
+ Int.MaxValue,
+ true,
+ false,
+ metrics,
+ false)
+ // After initialize(), we'll have 2 FetchRequests that one has 2 blocks inside and another one
+ // has only one block. So only the first FetchRequest can be sent. The second FetchRequest will
+ // hit maxBlocksInFlightPerAddress so it won't be sent.
+ verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any(), any(), any())
+ // the first request packaged 2 blocks, so we also need to
+ // call next() for 2 times to exhaust the iterator.
+ assert(iterator.hasNext)
+ iterator.next()
+ assert(iterator.hasNext)
+ iterator.next()
+ verify(transfer, times(2)).fetchBlocks(any(), any(), any(), any(), any(), any())
+ assert(iterator.hasNext)
+ iterator.next()
+ assert(!iterator.hasNext)
+ }
+
test("fetch continuous blocks in batch successful 3 local + 4 host local + 2 remote reads") {
val blockManager = mock(classOf[BlockManager])
val localBmId = BlockManagerId("test-client", "test-local-host", 1)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org