You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/04/28 20:17:56 UTC

[GitHub] [spark] xkrogen opened a new pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

xkrogen opened a new pull request #32389:
URL: https://github.com/apache/spark/pull/32389


   ### What changes were proposed in this pull request?
   Introduce new shared methods to `ShuffleBlockFetcherIteratorSuite` to replace copy-pasted code. Use modern, Scala-like Mockito `Answer` syntax.
   
   ### Why are the changes needed?
   `ShuffleFetcherBlockIteratorSuite` has tons of duplicate code, like https://github.com/apache/spark/blob/0494dc90af48ce7da0625485a4dc6917a244d580/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala#L172-L185 . It's challenging to tell what the interesting parts are vs. what is just being set to some default/unused value.
   
   Similarly but not as bad, there are many calls like the following
   ```
   verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any(), any(), any())
   when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any())).thenAnswer ...
   ```
   
   These changes result in about 10% reduction in both lines and characters in the file:
   ```bash
   # Before
   > wc core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
       1063    3950   43201 core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
   
   # After
   > wc core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
        928    3609   39053 core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
   ```
   
   It also helps readability, e.g.:
   ```
       val iterator = createShuffleBlockIteratorWithDefaults(
         transfer,
         blocksByAddress,
         maxBytesInFlight = 1000L
       )
   ```
   Now I can clearly tell that `maxBytesInFlight` is the main parameter we're interested in here.
   
   ### Does this PR introduce _any_ user-facing change?
   No, test only. There aren't even any behavior changes, just refactoring.
   
   ### How was this patch tested?
   Unit tests pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-843721497


   Merging to master, thanks @xkrogen.
   Thanks for the reviews @Ngone51 , @otterc !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r622749490



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -1061,3 +857,73 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     assert(mergedBlock.size === inputBlocks.map(_.size).sum)
   }
 }
+
+object ShuffleBlockFetcherIteratorSuite {
+
+  private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*)
+
+  private def answerFetchBlocks(transfer: BlockTransferService)(answer: Answer[Unit]): Unit =
+    when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any())).thenAnswer(answer)
+
+  private def verifyFetchBlocksCount(transfer: BlockTransferService, expectedCount: Int): Unit =

Review comment:
       nit: `verifyFetchBlocksInvocationCount` ?

##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -1061,3 +857,73 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     assert(mergedBlock.size === inputBlocks.map(_.size).sum)
   }
 }
+
+object ShuffleBlockFetcherIteratorSuite {

Review comment:
       Why you put these functions in the object? I think it's fine to put in the class.

##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -704,39 +577,25 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     val corruptLocalBuffer = new FileSegmentManagedBuffer(null, new File("a"), 0, 100)
 
     val transfer = mock(classOf[BlockTransferService])
-    when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
-      .thenAnswer((invocation: InvocationOnMock) => {
-        val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
-        Future {
-          // Return the first block, and then fail.
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 1, 0).toString, mockCorruptBuffer())
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 2, 0).toString, corruptLocalBuffer)
-          sem.release()
-        }
-      })
-
-    val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])](
-      (remoteBmId, blocks.keys.map(blockId => (blockId, 1L, 0)).toSeq)).toIterator
+    answerFetchBlocks(transfer) { invocation =>
+      val listener = invocation.getArgument[BlockFetchingListener](4)
+      Future {
+        // Return the first block, and then fail.
+        listener.onBlockFetchSuccess(
+          ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
+        listener.onBlockFetchSuccess(
+          ShuffleBlockId(0, 1, 0).toString, mockCorruptBuffer())
+        listener.onBlockFetchSuccess(
+          ShuffleBlockId(0, 2, 0).toString, corruptLocalBuffer)
+        sem.release()
+      }
+    }
 
-    val taskContext = TaskContext.empty()
-    val iterator = new ShuffleBlockFetcherIterator(
-      taskContext,
+    val iterator = createShuffleBlockIteratorWithDefaults(
       transfer,
-      blockManager,
-      blocksByAddress,
-      (_, in) => new LimitedInputStream(in, 100),
-      48 * 1024 * 1024,

Review comment:
       The default `Int.MaxValue` works too?

##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -270,38 +231,26 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     val transfer = createMockTransfer(Map(
       blockId1 -> createMockManagedBuffer(1000),
       blockId2 -> createMockManagedBuffer(1000)))
-    val taskContext = TaskContext.empty()
-    val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics()
-    val iterator = new ShuffleBlockFetcherIterator(
-      taskContext,
+    val iterator = createShuffleBlockIteratorWithDefaults(
       transfer,
-      blockManager,
       blocksByAddress,
-      (_, in) => in,
-      1000L, // allow 1 FetchRequests at most at the same time

Review comment:
       Shall we keep this comment?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r634521407



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, blockMapIndex)).toSeq)
+      }.toIterator,
+      streamWrapperLimitSize
+        .map(limit => (_: BlockId, in: InputStream) => new LimitedInputStream(in, limit))
+        .getOrElse((_: BlockId, in: InputStream) => in),

Review comment:
       Interesting, thanks fo catching it !




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-828795103


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42574/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
xkrogen commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-832083814


   @Ngone51 do you have any more comments here? Thanks a lot for your comments so far!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-828770214


   **[Test build #138055 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138055/testReport)** for PR 32389 at commit [`c96a16e`](https://github.com/apache/spark/commit/c96a16e074f8ba4a25cf4f70c274f43dd516e0f7).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-829454866


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42607/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-828794499


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42573/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
xkrogen commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r629498336



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,54 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  /**
+   * Get a blockByAddress iterator for a single BlockManagerId assuming all blocks have the same
+   * size and `blockMapId`.
+   */
+  private def getBlocksByAddressForSingleBM(

Review comment:
       Good idea! I focused on the single-BM case because it was simpler, but you're right that there was still a lot of common logic to be reduced. Actually, your suggestion made me realize that we only ever use this method (and the new multi-BM method I created) to create an iterator which is then passed to `getShuffleIteratorWithDefaults`, so I just made `getShuffleIteratorWithDefaults` directly accept a `Map`. I think it's very clean now, thank you for the suggestion!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-837177734


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138338/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-828785421






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r632825877



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, blockMapIndex)).toSeq)
+      }.toIterator,
+      streamWrapperLimitSize
+        .map(limit => (_: BlockId, in: InputStream) => new LimitedInputStream(in, limit))
+        .getOrElse((_: BlockId, in: InputStream) => in),

Review comment:
       super nit:
   ```suggestion
         (_, in) => new LimitedInputStream(in, streamWrapperLimitSize.getOrElse(Long.MaxValue))
   ```

##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, blockMapIndex)).toSeq)
+      }.toIterator,

Review comment:
       We are making the assumption about all blocks are of same size for a block manager ...
   This is currently the way code happens to be written in this suite - but is not a general expectation.
   Something which could require future change to this method : do we want to make this explicit in the parameter ?
   
   `Map[BlockManagerId, Seq[(BlockId, Long, Int)]` instead - matching what is in `ShuffleBlockFetcherIterator` - with a util method to convert for current usage to this form (essentially, pull this conversion to a method and delegate to that for all current usage) ?

##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, blockMapIndex)).toSeq)
+      }.toIterator,
+      streamWrapperLimitSize
+        .map(limit => (_: BlockId, in: InputStream) => new LimitedInputStream(in, limit))
+        .getOrElse((_: BlockId, in: InputStream) => in),
+      maxBytesInFlight,
+      maxReqsInFlight,
+      maxBlocksInFlightPerAddress,
+      maxReqSizeShuffleToMem,
+      detectCorrupt,
+      detectCorruptUseExtraMemory,
+      shuffleMetrics.getOrElse(tContext.taskMetrics().createTempShuffleReadMetrics()),

Review comment:
       nit: `tContext.taskMetrics.createTempShuffleReadMetrics()`

##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -703,40 +600,24 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     val sem = new Semaphore(0)
     val corruptLocalBuffer = new FileSegmentManagedBuffer(null, new File("a"), 0, 100)
 
-    val transfer = mock(classOf[BlockTransferService])
-    when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
-      .thenAnswer((invocation: InvocationOnMock) => {
-        val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
-        Future {
-          // Return the first block, and then fail.
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 1, 0).toString, mockCorruptBuffer())
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 2, 0).toString, corruptLocalBuffer)
-          sem.release()
-        }
-      })
-
-    val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])](
-      (remoteBmId, blocks.keys.map(blockId => (blockId, 1L, 0)).toSeq)).toIterator
+    answerFetchBlocks { invocation =>
+      val listener = invocation.getArgument[BlockFetchingListener](4)
+      Future {
+        // Return the first block, and then fail.
+        listener.onBlockFetchSuccess(
+          ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
+        listener.onBlockFetchSuccess(
+          ShuffleBlockId(0, 1, 0).toString, mockCorruptBuffer())
+        listener.onBlockFetchSuccess(
+          ShuffleBlockId(0, 2, 0).toString, corruptLocalBuffer)
+        sem.release()
+      }
+    }
 
-    val taskContext = TaskContext.empty()
-    val iterator = new ShuffleBlockFetcherIterator(
-      taskContext,
-      transfer,
-      blockManager,
-      blocksByAddress,
-      (_, in) => new LimitedInputStream(in, 100),
-      48 * 1024 * 1024,
-      Int.MaxValue,
-      Int.MaxValue,
-      Int.MaxValue,
-      true,
-      true,
-      taskContext.taskMetrics.createTempShuffleReadMetrics(),
-      false)
+    val iterator = createShuffleBlockIteratorWithDefaults(
+      Map(remoteBmId ->(blocks.keys, 1L, 0)),

Review comment:
       super nit: space after `->` (here and below)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-828762506


   **[Test build #138054 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138054/testReport)** for PR 32389 at commit [`322593f`](https://github.com/apache/spark/commit/322593f373c4ae531e0f129750c077de4b71fb4f).
    * This patch **fails Scala style tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-842587391


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/43161/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-836989725


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42860/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-828838284


   **[Test build #138055 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138055/testReport)** for PR 32389 at commit [`c96a16e`](https://github.com/apache/spark/commit/c96a16e074f8ba4a25cf4f70c274f43dd516e0f7).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r632823782



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],

Review comment:
       nit: I was oscillating between this being a `Map` vs `Seq` ... currently, a `Map` is fine based on how `ShuffleBlockFetcherIterator` is used ... but might be something we revisit in future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-842661645


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138641/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
xkrogen commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r633729925



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, blockMapIndex)).toSeq)
+      }.toIterator,
+      streamWrapperLimitSize
+        .map(limit => (_: BlockId, in: InputStream) => new LimitedInputStream(in, limit))
+        .getOrElse((_: BlockId, in: InputStream) => in),
+      maxBytesInFlight,
+      maxReqsInFlight,
+      maxBlocksInFlightPerAddress,
+      maxReqSizeShuffleToMem,
+      detectCorrupt,
+      detectCorruptUseExtraMemory,
+      shuffleMetrics.getOrElse(tContext.taskMetrics().createTempShuffleReadMetrics()),

Review comment:
       `taskMetrics()` is defined as an empty-paren method so it should be called with parenthesis:
   ```
     def taskMetrics(): TaskMetrics
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-839056329


   **[Test build #138386 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138386/testReport)** for PR 32389 at commit [`b2abb87`](https://github.com/apache/spark/commit/b2abb87f3dfafc03ade66d3c3bc333440fde761a).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
xkrogen commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r633725651



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, blockMapIndex)).toSeq)
+      }.toIterator,
+      streamWrapperLimitSize
+        .map(limit => (_: BlockId, in: InputStream) => new LimitedInputStream(in, limit))
+        .getOrElse((_: BlockId, in: InputStream) => in),

Review comment:
       good suggestion! incorporated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
xkrogen commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r623195101



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -704,39 +577,25 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     val corruptLocalBuffer = new FileSegmentManagedBuffer(null, new File("a"), 0, 100)
 
     val transfer = mock(classOf[BlockTransferService])
-    when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
-      .thenAnswer((invocation: InvocationOnMock) => {
-        val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
-        Future {
-          // Return the first block, and then fail.
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 1, 0).toString, mockCorruptBuffer())
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 2, 0).toString, corruptLocalBuffer)
-          sem.release()
-        }
-      })
-
-    val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])](
-      (remoteBmId, blocks.keys.map(blockId => (blockId, 1L, 0)).toSeq)).toIterator
+    answerFetchBlocks(transfer) { invocation =>
+      val listener = invocation.getArgument[BlockFetchingListener](4)
+      Future {
+        // Return the first block, and then fail.
+        listener.onBlockFetchSuccess(
+          ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
+        listener.onBlockFetchSuccess(
+          ShuffleBlockId(0, 1, 0).toString, mockCorruptBuffer())
+        listener.onBlockFetchSuccess(
+          ShuffleBlockId(0, 2, 0).toString, corruptLocalBuffer)
+        sem.release()
+      }
+    }
 
-    val taskContext = TaskContext.empty()
-    val iterator = new ShuffleBlockFetcherIterator(
-      taskContext,
+    val iterator = createShuffleBlockIteratorWithDefaults(
       transfer,
-      blockManager,
-      blocksByAddress,
-      (_, in) => new LimitedInputStream(in, 100),
-      48 * 1024 * 1024,

Review comment:
       Yeah. I think this 48MB value was just copy-pasted as a placeholder. It's too large to affect any of the tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-828873256


   cc @Ngone51 @mridulm FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
xkrogen commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r634595619



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,47 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, Seq[(BlockId, Long, Int)]],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.toIterator,
+      (_, in) => streamWrapperLimitSize.map(new LimitedInputStream(in, _)).getOrElse(in),
+      maxBytesInFlight,
+      maxReqsInFlight,
+      maxBlocksInFlightPerAddress,
+      maxReqSizeShuffleToMem,
+      detectCorrupt,
+      detectCorruptUseExtraMemory,
+      shuffleMetrics.getOrElse(tContext.taskMetrics().createTempShuffleReadMetrics()),
+      doBatchFetch)
+  }
+  // scalastyle:on argcount
+
+  /**
+   * Convert a list of block IDs into a list of blocks with metadata, assuming all blocks have the
+   * same size and index.
+   */
+  private def toBlockList(blockIds: Traversable[BlockId], blockSize: Long, blockMapIndex: Int)
+  : Seq[(BlockId, Long, Int)] = {

Review comment:
       I think it should actually be 2 spaces based on this [Databricks Scala Guide](https://github.com/databricks/scala-style-guide) example:
   ```
   def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
       path: String,
       fClass: Class[F],
       kClass: Class[K],
       vClass: Class[V],
       conf: Configuration = hadoopConfiguration)
     : RDD[(K, V)] = {
     // method body
   }
   ```
   But I agree this format is a little confusing. I'll reformat.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-836979529






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r628664801



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,54 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  /**
+   * Get a blockByAddress iterator for a single BlockManagerId assuming all blocks have the same
+   * size and `blockMapId`.
+   */
+  private def getBlocksByAddressForSingleBM(
+      blockManagerId: BlockManagerId,
+      blocks: Traversable[BlockId],
+      blockSize: Long,
+      blockMapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {
+    Seq(
+      (blockManagerId, blocks.map(blockId => (blockId, blockSize, blockMapId)).toSeq)
+    ).toIterator
+  }
+
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress,
+      streamWrapperLimitSize
+          .map(limit => (_: BlockId, in: InputStream) => new LimitedInputStream(in, limit))
+          .getOrElse((_: BlockId, in: InputStream) => in),

Review comment:
       nit: 2 indents




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
xkrogen commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r633726039



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -703,40 +600,24 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     val sem = new Semaphore(0)
     val corruptLocalBuffer = new FileSegmentManagedBuffer(null, new File("a"), 0, 100)
 
-    val transfer = mock(classOf[BlockTransferService])
-    when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
-      .thenAnswer((invocation: InvocationOnMock) => {
-        val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
-        Future {
-          // Return the first block, and then fail.
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 1, 0).toString, mockCorruptBuffer())
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 2, 0).toString, corruptLocalBuffer)
-          sem.release()
-        }
-      })
-
-    val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])](
-      (remoteBmId, blocks.keys.map(blockId => (blockId, 1L, 0)).toSeq)).toIterator
+    answerFetchBlocks { invocation =>
+      val listener = invocation.getArgument[BlockFetchingListener](4)
+      Future {
+        // Return the first block, and then fail.
+        listener.onBlockFetchSuccess(
+          ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
+        listener.onBlockFetchSuccess(
+          ShuffleBlockId(0, 1, 0).toString, mockCorruptBuffer())
+        listener.onBlockFetchSuccess(
+          ShuffleBlockId(0, 2, 0).toString, corruptLocalBuffer)
+        sem.release()
+      }
+    }
 
-    val taskContext = TaskContext.empty()
-    val iterator = new ShuffleBlockFetcherIterator(
-      taskContext,
-      transfer,
-      blockManager,
-      blocksByAddress,
-      (_, in) => new LimitedInputStream(in, 100),
-      48 * 1024 * 1024,
-      Int.MaxValue,
-      Int.MaxValue,
-      Int.MaxValue,
-      true,
-      true,
-      taskContext.taskMetrics.createTempShuffleReadMetrics(),
-      false)
+    val iterator = createShuffleBlockIteratorWithDefaults(
+      Map(remoteBmId ->(blocks.keys, 1L, 0)),

Review comment:
       thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
otterc commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r630293317



##########
File path: .idea/vcs.xml
##########
@@ -1,24 +1,16 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
--->

Review comment:
       Nit: This seems to be picked up by error




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
xkrogen commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r623187485



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -270,38 +231,26 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     val transfer = createMockTransfer(Map(
       blockId1 -> createMockManagedBuffer(1000),
       blockId2 -> createMockManagedBuffer(1000)))
-    val taskContext = TaskContext.empty()
-    val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics()
-    val iterator = new ShuffleBlockFetcherIterator(
-      taskContext,
+    val iterator = createShuffleBlockIteratorWithDefaults(
       transfer,
-      blockManager,
       blocksByAddress,
-      (_, in) => in,
-      1000L, // allow 1 FetchRequests at most at the same time

Review comment:
       Definitely, good catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-828762528


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138054/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-842661645


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138641/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-843489993


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43211/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
otterc commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r630293317



##########
File path: .idea/vcs.xml
##########
@@ -1,24 +1,16 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
--->

Review comment:
       Nit: This seems to be picked up by error




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-828770214


   **[Test build #138055 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138055/testReport)** for PR 32389 at commit [`c96a16e`](https://github.com/apache/spark/commit/c96a16e074f8ba4a25cf4f70c274f43dd516e0f7).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r628664369



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,54 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  /**
+   * Get a blockByAddress iterator for a single BlockManagerId assuming all blocks have the same
+   * size and `blockMapId`.
+   */
+  private def getBlocksByAddressForSingleBM(
+      blockManagerId: BlockManagerId,
+      blocks: Traversable[BlockId],
+      blockSize: Long,
+      blockMapId: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = {

Review comment:
       This should be `mapIndex` instead of `mapId`. Could you rename it to `blockMapIndex`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-836989725


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42860/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-829410682


   **[Test build #138087 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138087/testReport)** for PR 32389 at commit [`596c3dd`](https://github.com/apache/spark/commit/596c3dda2b3d20f7c35f0dd897c2ebbe6ff77ddb).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-828762528


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138054/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
xkrogen commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r623186492



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -1061,3 +857,73 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     assert(mergedBlock.size === inputBlocks.map(_.size).sum)
   }
 }
+
+object ShuffleBlockFetcherIteratorSuite {

Review comment:
       Sure, I guess there is no strong reason. They can be static / located in the object but it's just a test so it doesn't matter much either way.

##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -1061,3 +857,73 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     assert(mergedBlock.size === inputBlocks.map(_.size).sum)
   }
 }
+
+object ShuffleBlockFetcherIteratorSuite {
+
+  private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*)
+
+  private def answerFetchBlocks(transfer: BlockTransferService)(answer: Answer[Unit]): Unit =
+    when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any())).thenAnswer(answer)
+
+  private def verifyFetchBlocksCount(transfer: BlockTransferService, expectedCount: Int): Unit =

Review comment:
       Yes, good call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-843549102


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138690/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
xkrogen commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-836915607


   Great comments @Ngone51 ! Pushed up a new set of commits addressing your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-842583641


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43161/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
otterc commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-838683440


   @xkrogen @Ngone51 @mridulm Gentle ping folks to check if this PR can be merged. To me it looks like it is ready.
   I need to rework the tests in my [PR](https://github.com/apache/spark/pull/32140) once this is merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
xkrogen commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r633725651



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, blockMapIndex)).toSeq)
+      }.toIterator,
+      streamWrapperLimitSize
+        .map(limit => (_: BlockId, in: InputStream) => new LimitedInputStream(in, limit))
+        .getOrElse((_: BlockId, in: InputStream) => in),

Review comment:
       good suggestion! incorporated.

##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -703,40 +600,24 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     val sem = new Semaphore(0)
     val corruptLocalBuffer = new FileSegmentManagedBuffer(null, new File("a"), 0, 100)
 
-    val transfer = mock(classOf[BlockTransferService])
-    when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
-      .thenAnswer((invocation: InvocationOnMock) => {
-        val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
-        Future {
-          // Return the first block, and then fail.
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 1, 0).toString, mockCorruptBuffer())
-          listener.onBlockFetchSuccess(
-            ShuffleBlockId(0, 2, 0).toString, corruptLocalBuffer)
-          sem.release()
-        }
-      })
-
-    val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])](
-      (remoteBmId, blocks.keys.map(blockId => (blockId, 1L, 0)).toSeq)).toIterator
+    answerFetchBlocks { invocation =>
+      val listener = invocation.getArgument[BlockFetchingListener](4)
+      Future {
+        // Return the first block, and then fail.
+        listener.onBlockFetchSuccess(
+          ShuffleBlockId(0, 0, 0).toString, blocks(ShuffleBlockId(0, 0, 0)))
+        listener.onBlockFetchSuccess(
+          ShuffleBlockId(0, 1, 0).toString, mockCorruptBuffer())
+        listener.onBlockFetchSuccess(
+          ShuffleBlockId(0, 2, 0).toString, corruptLocalBuffer)
+        sem.release()
+      }
+    }
 
-    val taskContext = TaskContext.empty()
-    val iterator = new ShuffleBlockFetcherIterator(
-      taskContext,
-      transfer,
-      blockManager,
-      blocksByAddress,
-      (_, in) => new LimitedInputStream(in, 100),
-      48 * 1024 * 1024,
-      Int.MaxValue,
-      Int.MaxValue,
-      Int.MaxValue,
-      true,
-      true,
-      taskContext.taskMetrics.createTempShuffleReadMetrics(),
-      false)
+    val iterator = createShuffleBlockIteratorWithDefaults(
+      Map(remoteBmId ->(blocks.keys, 1L, 0)),

Review comment:
       thanks!

##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, blockMapIndex)).toSeq)
+      }.toIterator,

Review comment:
       I think this is closely related to your [other comment](https://github.com/apache/spark/pull/32389#discussion_r632823782), so I'll respond to both here.
   
   I agree that this is tailored to the current set of tests and thus current usage, as opposed to potential future usage. If we were designing a public API, or even a private API in the production code, I would agree with you. But in this case for a class-private method in a test file, I'm not convinced that designing for future possibilities is the right move. If someone later adds tests which do need blocks of different sizes, they can make the modification you've described, right? For now I would preference simplicity, and we can introduce the additional complexity if necessary.
   
   WDYT? This is not a strong conviction on my side so if you are not convinced I can make changes as you proposed.

##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, blockMapIndex)).toSeq)
+      }.toIterator,
+      streamWrapperLimitSize
+        .map(limit => (_: BlockId, in: InputStream) => new LimitedInputStream(in, limit))
+        .getOrElse((_: BlockId, in: InputStream) => in),
+      maxBytesInFlight,
+      maxReqsInFlight,
+      maxBlocksInFlightPerAddress,
+      maxReqSizeShuffleToMem,
+      detectCorrupt,
+      detectCorruptUseExtraMemory,
+      shuffleMetrics.getOrElse(tContext.taskMetrics().createTempShuffleReadMetrics()),

Review comment:
       `taskMetrics()` is an empty-paren method so it should be called with parenthesis:
   ```
     def taskMetrics(): TaskMetrics
   ```

##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, blockMapIndex)).toSeq)
+      }.toIterator,
+      streamWrapperLimitSize
+        .map(limit => (_: BlockId, in: InputStream) => new LimitedInputStream(in, limit))
+        .getOrElse((_: BlockId, in: InputStream) => in),
+      maxBytesInFlight,
+      maxReqsInFlight,
+      maxBlocksInFlightPerAddress,
+      maxReqSizeShuffleToMem,
+      detectCorrupt,
+      detectCorruptUseExtraMemory,
+      shuffleMetrics.getOrElse(tContext.taskMetrics().createTempShuffleReadMetrics()),

Review comment:
       `taskMetrics()` is defined as an empty-paren method so it should be called with parenthesis:
   ```
     def taskMetrics(): TaskMetrics
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
xkrogen commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r633729925



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, blockMapIndex)).toSeq)
+      }.toIterator,
+      streamWrapperLimitSize
+        .map(limit => (_: BlockId, in: InputStream) => new LimitedInputStream(in, limit))
+        .getOrElse((_: BlockId, in: InputStream) => in),
+      maxBytesInFlight,
+      maxReqsInFlight,
+      maxBlocksInFlightPerAddress,
+      maxReqSizeShuffleToMem,
+      detectCorrupt,
+      detectCorruptUseExtraMemory,
+      shuffleMetrics.getOrElse(tContext.taskMetrics().createTempShuffleReadMetrics()),

Review comment:
       `taskMetrics()` is an empty-paren method so it should be called with parenthesis:
   ```
     def taskMetrics(): TaskMetrics
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-836917482


   **[Test build #138338 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138338/testReport)** for PR 32389 at commit [`196cb06`](https://github.com/apache/spark/commit/196cb06f34d43292ecb33aae3334dcd1d51946fe).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-829451867


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42607/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-843503770


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/43211/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-837177734


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138338/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] asfgit closed pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #32389:
URL: https://github.com/apache/spark/pull/32389


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
xkrogen commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-844281219


   Thanks for the reviews @mridulm , @otterc, and @Ngone51 ! Much appreciated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-828761011


   **[Test build #138054 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138054/testReport)** for PR 32389 at commit [`322593f`](https://github.com/apache/spark/commit/322593f373c4ae531e0f129750c077de4b71fb4f).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-829532742


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138087/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-829410682


   **[Test build #138087 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138087/testReport)** for PR 32389 at commit [`596c3dd`](https://github.com/apache/spark/commit/596c3dda2b3d20f7c35f0dd897c2ebbe6ff77ddb).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-838842568


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42909/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-829514839


   **[Test build #138087 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138087/testReport)** for PR 32389 at commit [`596c3dd`](https://github.com/apache/spark/commit/596c3dda2b3d20f7c35f0dd897c2ebbe6ff77ddb).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-842587363


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43161/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
xkrogen commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-829382591


   Thanks @Ngone51 for the suggestions!
   
   I put up two more commits. The first pretty directly answers your comments and has some other small fixes I noticed.
   
   The second one is a bit larger and was inspired by your comment about moving the helper methods from the `object` to the `class`. All of the tests make use of `val transfer = mock(classOf[BlockTransferService])` and then pass this around to the helper methods. I moved this to be a field instead and then we can remove `transfer` from being passed in the method signatures. I think it's cleaner but open to feedback on that part, I am happy to revert if you/others don't find it useful.
   
   It does reduce the size by another ~40 lines:
   ```
   > wc core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
        894    3509   37951 core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-839059542


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138386/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-842644710


   **[Test build #138641 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138641/testReport)** for PR 32389 at commit [`4fcc6be`](https://github.com/apache/spark/commit/4fcc6be8e55e7b12aec0bd414419af159c0b9f31).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-842587391


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/43161/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-828961924


   This's a great refactor. I left some minor comments. Overall, looks good to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-829456833


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42607/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-838856369


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42909/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-842548288


   **[Test build #138641 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138641/testReport)** for PR 32389 at commit [`4fcc6be`](https://github.com/apache/spark/commit/4fcc6be8e55e7b12aec0bd414419af159c0b9f31).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
xkrogen commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r629467040



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -228,117 +272,72 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     when(blockManager.hostLocalDirManager).thenReturn(Some(hostLocalDirManager))
     when(mockExternalBlockStoreClient.getHostLocalDirs(any(), any(), any(), any()))
       .thenAnswer { invocation =>
-        val completableFuture = invocation.getArguments()(3)
-          .asInstanceOf[CompletableFuture[java.util.Map[String, Array[String]]]]
-        completableFuture.completeExceptionally(new Throwable("failed fetch"))
+        invocation.getArgument[CompletableFuture[java.util.Map[String, Array[String]]]](3)
+          .completeExceptionally(new Throwable("failed fetch"))
       }
 
     blockManager.hostLocalDirManager = Some(hostLocalDirManager)
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])](
       (hostLocalBmId, hostLocalBlocks.keys.map(blockId => (blockId, 1L, 1)).toSeq)
     ).toIterator

Review comment:
       it should be used here, good catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-837148852


   **[Test build #138338 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138338/testReport)** for PR 32389 at commit [`196cb06`](https://github.com/apache/spark/commit/196cb06f34d43292ecb33aae3334dcd1d51946fe).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-843399879


   **[Test build #138690 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138690/testReport)** for PR 32389 at commit [`eea80f5`](https://github.com/apache/spark/commit/eea80f5c8c487a7117036b7cfb963137a4b7eeb5).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
xkrogen commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r630300027



##########
File path: .idea/vcs.xml
##########
@@ -1,24 +1,16 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
--->

Review comment:
       Oops! Good catch @otterc, thank you!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-828795066






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-828795103


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42574/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-828761011


   **[Test build #138054 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138054/testReport)** for PR 32389 at commit [`322593f`](https://github.com/apache/spark/commit/322593f373c4ae531e0f129750c077de4b71fb4f).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-828846565


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138055/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-843549102


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138690/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-829456833


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42607/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-838850182


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42909/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-842548288


   **[Test build #138641 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138641/testReport)** for PR 32389 at commit [`4fcc6be`](https://github.com/apache/spark/commit/4fcc6be8e55e7b12aec0bd414419af159c0b9f31).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-828846565


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138055/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-838856369


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42909/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-838776322


   **[Test build #138386 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138386/testReport)** for PR 32389 at commit [`b2abb87`](https://github.com/apache/spark/commit/b2abb87f3dfafc03ade66d3c3bc333440fde761a).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-843399879


   **[Test build #138690 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138690/testReport)** for PR 32389 at commit [`eea80f5`](https://github.com/apache/spark/commit/eea80f5c8c487a7117036b7cfb963137a4b7eeb5).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-839059542


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138386/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-843538579


   **[Test build #138690 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138690/testReport)** for PR 32389 at commit [`eea80f5`](https://github.com/apache/spark/commit/eea80f5c8c487a7117036b7cfb963137a4b7eeb5).
    * This patch **fails SparkR unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r628668071



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,54 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  /**
+   * Get a blockByAddress iterator for a single BlockManagerId assuming all blocks have the same
+   * size and `blockMapId`.
+   */
+  private def getBlocksByAddressForSingleBM(

Review comment:
       Maybe this could be extended to support the case of multiple blockmanagers, e.g.,
   
   ```scala
   val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])](
     (localBmId, localBlocks.keys.map(blockId => (blockId, 1L, 0)).toSeq),
     (remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 1L, 1)).toSeq),
     (hostLocalBmId, hostLocalBlocks.keys.map(blockId => (blockId, 1L, 1)).toSeq)
   ).toIterator
   ```
   
   We can pass in a `Map[BlockManagerId, (blocks, size, mapIndex)]` instead.
   
   WDYT?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-834961726


   cc @mridulm @tgravescs @attilapiros


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-843503770


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/43211/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-828794499


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42573/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-836917482


   **[Test build #138338 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138338/testReport)** for PR 32389 at commit [`196cb06`](https://github.com/apache/spark/commit/196cb06f34d43292ecb33aae3334dcd1d51946fe).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-843456226


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/43211/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
xkrogen commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r633729337



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, blockMapIndex)).toSeq)
+      }.toIterator,

Review comment:
       I think this is closely related to your [other comment](https://github.com/apache/spark/pull/32389#discussion_r632823782), so I'll respond to both here.
   
   I agree that this is tailored to the current set of tests and thus current usage, as opposed to potential future usage. If we were designing a public API, or even a private API in the production code, I would agree with you. But in this case for a class-private method in a test file, I'm not convinced that designing for future possibilities is the right move. If someone later adds tests which do need blocks of different sizes, they can make the modification you've described, right? For now I would preference simplicity, and we can introduce the additional complexity if necessary.
   
   WDYT? This is not a strong conviction on my side so if you are not convinced I can make changes as you proposed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r634520674



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, blockMapIndex)).toSeq)
+      }.toIterator,
+      streamWrapperLimitSize
+        .map(limit => (_: BlockId, in: InputStream) => new LimitedInputStream(in, limit))
+        .getOrElse((_: BlockId, in: InputStream) => in),
+      maxBytesInFlight,
+      maxReqsInFlight,
+      maxBlocksInFlightPerAddress,
+      maxReqSizeShuffleToMem,
+      detectCorrupt,
+      detectCorruptUseExtraMemory,
+      shuffleMetrics.getOrElse(tContext.taskMetrics().createTempShuffleReadMetrics()),

Review comment:
       You are right !
   So we had broken code before, sigh (that was directly lifted from line: 171 in prev class)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
otterc commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r634587251



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,47 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, Seq[(BlockId, Long, Int)]],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.toIterator,
+      (_, in) => streamWrapperLimitSize.map(new LimitedInputStream(in, _)).getOrElse(in),
+      maxBytesInFlight,
+      maxReqsInFlight,
+      maxBlocksInFlightPerAddress,
+      maxReqSizeShuffleToMem,
+      detectCorrupt,
+      detectCorruptUseExtraMemory,
+      shuffleMetrics.getOrElse(tContext.taskMetrics().createTempShuffleReadMetrics()),
+      doBatchFetch)
+  }
+  // scalastyle:on argcount
+
+  /**
+   * Convert a list of block IDs into a list of blocks with metadata, assuming all blocks have the
+   * same size and index.
+   */
+  private def toBlockList(blockIds: Traversable[BlockId], blockSize: Long, blockMapIndex: Int)
+  : Seq[(BlockId, Long, Int)] = {

Review comment:
       Nit: please check if the indentation here needs to be 4. Since this doesn't fit in a line maybe change it to below format
   ```
   private def toBlockList(
       blockIds: ...,
       blockSize:...,
        ....): Seq
   ```
        




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-829532742


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138087/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
xkrogen commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r633761349



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, blockMapIndex)).toSeq)
+      }.toIterator,

Review comment:
       I changed my mind on this; the increase in generality really introduced very little complexity. It took me less time to make the change than it did to write my initial response to your comment :)

##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -123,6 +131,42 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     verify(wrappedInputStream.invokePrivate(delegateAccess()), times(1)).close()
   }
 
+  // scalastyle:off argcount
+  private def createShuffleBlockIteratorWithDefaults(
+      blocksByAddress: Map[BlockManagerId, (Traversable[BlockId], Long, Int)],
+      taskContext: Option[TaskContext] = None,
+      streamWrapperLimitSize: Option[Long] = None,
+      blockManager: Option[BlockManager] = None,
+      maxBytesInFlight: Long = Long.MaxValue,
+      maxReqsInFlight: Int = Int.MaxValue,
+      maxBlocksInFlightPerAddress: Int = Int.MaxValue,
+      maxReqSizeShuffleToMem: Int = Int.MaxValue,
+      detectCorrupt: Boolean = true,
+      detectCorruptUseExtraMemory: Boolean = true,
+      shuffleMetrics: Option[ShuffleReadMetricsReporter] = None,
+      doBatchFetch: Boolean = false): ShuffleBlockFetcherIterator = {
+    val tContext = taskContext.getOrElse(TaskContext.empty())
+    new ShuffleBlockFetcherIterator(
+      tContext,
+      transfer,
+      blockManager.getOrElse(createMockBlockManager()),
+      blocksByAddress.map { case (blockManagerId, (blocks, blockSize, blockMapIndex)) =>
+        (blockManagerId, blocks.map(blockId => (blockId, blockSize, blockMapIndex)).toSeq)
+      }.toIterator,
+      streamWrapperLimitSize
+        .map(limit => (_: BlockId, in: InputStream) => new LimitedInputStream(in, limit))
+        .getOrElse((_: BlockId, in: InputStream) => in),

Review comment:
       Actually unfortunately this doesn't work because some of the tests assume they have direct access to the input stream buffer to be able to do some mock verification. So it's better not to wrap.
   
   But I was able to simplify it to:
   ```
   (_, in) => streamWrapperLimitSize.map(new LimitedInputStream(in, _)).getOrElse(in)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32389:
URL: https://github.com/apache/spark/pull/32389#issuecomment-838776322


   **[Test build #138386 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138386/testReport)** for PR 32389 at commit [`b2abb87`](https://github.com/apache/spark/commit/b2abb87f3dfafc03ade66d3c3bc333440fde761a).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #32389: [SPARK-35263] [TEST] Refactor ShuffleBlockFetcherIteratorSuite to reduce duplicated code

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #32389:
URL: https://github.com/apache/spark/pull/32389#discussion_r628668376



##########
File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
##########
@@ -228,117 +272,72 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     when(blockManager.hostLocalDirManager).thenReturn(Some(hostLocalDirManager))
     when(mockExternalBlockStoreClient.getHostLocalDirs(any(), any(), any(), any()))
       .thenAnswer { invocation =>
-        val completableFuture = invocation.getArguments()(3)
-          .asInstanceOf[CompletableFuture[java.util.Map[String, Array[String]]]]
-        completableFuture.completeExceptionally(new Throwable("failed fetch"))
+        invocation.getArgument[CompletableFuture[java.util.Map[String, Array[String]]]](3)
+          .completeExceptionally(new Throwable("failed fetch"))
       }
 
     blockManager.hostLocalDirManager = Some(hostLocalDirManager)
     val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])](
       (hostLocalBmId, hostLocalBlocks.keys.map(blockId => (blockId, 1L, 1)).toSeq)
     ).toIterator

Review comment:
       Why this not reuse `getBlocksByAddressForSingleBM`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org