You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/02/01 18:23:00 UTC

[GitHub] ankuriitg commented on a change in pull request #23453: [SPARK-26089][CORE] Handle corruption in large shuffle blocks

ankuriitg commented on a change in pull request #23453: [SPARK-26089][CORE] Handle corruption in large shuffle blocks
URL: https://github.com/apache/spark/pull/23453#discussion_r253145808
 
 

 ##########
 File path: core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
 ##########
 @@ -211,6 +213,49 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
     assert(os.toByteArray.toList.equals(bytes.toList))
   }
 
+  test("copyStreamUpTo") {
+    // input array initialization
+    val bytes = Array.ofDim[Byte](1200)
+    Random.nextBytes(bytes)
+
+    val limit = 1000
+    // testing for inputLength less than, equal to and greater than limit
+    List(900, 1000, 1100).foreach { inputLength =>
+      val in = new ByteArrayInputStream(bytes.take(inputLength))
+      val (fullyCopied: Boolean, mergedStream: InputStream) = Utils.copyStreamUpTo(in, limit, true)
+      try {
+        val byteBufferInputStream = if (mergedStream.isInstanceOf[ChunkedByteBufferInputStream]) {
+          mergedStream.asInstanceOf[ChunkedByteBufferInputStream]
+        } else {
+          val sequenceStream = mergedStream.asInstanceOf[SequenceInputStream]
+          val fieldValue = getFieldValue(sequenceStream, "in")
+          assert(fieldValue.isInstanceOf[ChunkedByteBufferInputStream])
+          fieldValue.asInstanceOf[ChunkedByteBufferInputStream]
+        }
+        assert(fullyCopied === (inputLength < limit))
+        (0 until inputLength).foreach { idx =>
+          assert(bytes(idx) === mergedStream.read().asInstanceOf[Byte])
+          if (idx == limit) {
+            assert(byteBufferInputStream.chunkedByteBuffer === null)
+          }
+        }
+        assert(mergedStream.read() === -1)
+        assert(byteBufferInputStream.chunkedByteBuffer === null)
+      } finally {
+        IOUtils.closeQuietly(mergedStream)
+        IOUtils.closeQuietly(in)
+      }
+    }
+  }
+
+  private def getFieldValue(obj: AnyRef, fieldName: String): Any = {
+    val mirror = universe.runtimeMirror(obj.getClass().getClassLoader())
 
 Review comment:
   Sure, it is easier for me as well

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org