You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/03/07 22:57:35 UTC

[GitHub] [spark] squito commented on a change in pull request #23453: [SPARK-26089][CORE] Handle corruption in large shuffle blocks

squito commented on a change in pull request #23453: [SPARK-26089][CORE] Handle corruption in large shuffle blocks
URL: https://github.com/apache/spark/pull/23453#discussion_r263603012
 
 

 ##########
 File path: core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ##########
 @@ -425,28 +450,97 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     intercept[FetchFailedException] { iterator.next() }
   }
 
-  test("big blocks are not checked for corruption") {
-    val corruptBuffer = mockCorruptBuffer(10000L)
-
+  test("big blocks are also checked for corruption") {
+    val streamLength = 10000L
     val blockManager = mock(classOf[BlockManager])
-    val localBmId = BlockManagerId("test-client", "test-client", 1)
-    doReturn(localBmId).when(blockManager).blockManagerId
-    doReturn(corruptBuffer).when(blockManager).getBlockData(ShuffleBlockId(0, 0, 0))
-    val localBlockLengths = Seq[Tuple2[BlockId, Long]](
-      ShuffleBlockId(0, 0, 0) -> corruptBuffer.size()
+    val localBlockManagerId = BlockManagerId("local-client", "local-client", 1)
+    doReturn(localBlockManagerId).when(blockManager).blockManagerId
+
+    // This stream will throw IOException when the first byte is read
+    val corruptBuffer1 = mockCorruptBuffer(streamLength, 0)
+    val blockManagerId1 = BlockManagerId("remote-client-1", "remote-client-1", 1)
+    val shuffleBlockId1 = ShuffleBlockId(0, 1, 0)
+    val blockLengths1 = Seq[Tuple2[BlockId, Long]](
+      shuffleBlockId1 -> corruptBuffer1.size()
     )
 
-    val remoteBmId = BlockManagerId("test-client-1", "test-client-1", 2)
-    val remoteBlockLengths = Seq[Tuple2[BlockId, Long]](
-      ShuffleBlockId(0, 1, 0) -> corruptBuffer.size()
+    val streamNotCorruptTill = 8 * 1024
+    // This stream will throw exception after streamNotCorruptTill bytes are read
+    val corruptBuffer2 = mockCorruptBuffer(streamLength, streamNotCorruptTill)
+    val blockManagerId2 = BlockManagerId("remote-client-2", "remote-client-2", 2)
+    val shuffleBlockId2 = ShuffleBlockId(0, 2, 0)
+    val blockLengths2 = Seq[Tuple2[BlockId, Long]](
+      shuffleBlockId2 -> corruptBuffer2.size()
     )
 
     val transfer = createMockTransfer(
-      Map(ShuffleBlockId(0, 0, 0) -> corruptBuffer, ShuffleBlockId(0, 1, 0) -> corruptBuffer))
+      Map(shuffleBlockId1 -> corruptBuffer1, shuffleBlockId2 -> corruptBuffer2))
+    val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
+      (blockManagerId1, blockLengths1),
+      (blockManagerId2, blockLengths2)
+    ).toIterator
+    val taskContext = TaskContext.empty()
+    val maxBytesInFlight = 3 * 1024
+    val iterator = new ShuffleBlockFetcherIterator(
+      taskContext,
+      transfer,
+      blockManager,
+      blocksByAddress,
+      (_, in) => new LimitedInputStream(in, streamLength),
+      maxBytesInFlight,
+      Int.MaxValue,
+      Int.MaxValue,
+      Int.MaxValue,
+      true,
+      true,
+      taskContext.taskMetrics.createTempShuffleReadMetrics())
+
+    // Only one block should be returned which has corruption after maxBytesInFlight/3 because the
+    // other block will detect corruption on first fetch, and then get added to the queue again for
+    // a retry
 
 Review comment:
   I'd reword this -- you get one block because you call next() once.  You really want to explain why you get a certain block back.
   
   ```We'll get back the block which has corruption after maxBytesInFlight/3 because ...```
   
   (assuming I understand this correctly...)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org