You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/02/20 17:02:13 UTC

spark git commit: [SPARK-19646][CORE][STREAMING] binaryRecords replicates records in scala API

Repository: spark
Updated Branches:
  refs/heads/master 776b8f17c -> d0ecca607


[SPARK-19646][CORE][STREAMING] binaryRecords replicates records in scala API

## What changes were proposed in this pull request?

Use `BytesWritable.copyBytes`, not `getBytes`, because `getBytes` returns the underlying array, which may be reused when repeated reads don't need a different size, as is the case with binaryRecords APIs

## How was this patch tested?

Existing tests

Author: Sean Owen <so...@cloudera.com>

Closes #16974 from srowen/SPARK-19646.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0ecca60
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0ecca60
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0ecca60

Branch: refs/heads/master
Commit: d0ecca6075d86bedebf8bc2278085a2cd6cb0a43
Parents: 776b8f1
Author: Sean Owen <so...@cloudera.com>
Authored: Mon Feb 20 09:02:09 2017 -0800
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Feb 20 09:02:09 2017 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |   5 +-
 .../test/scala/org/apache/spark/FileSuite.scala | 178 ++++---------------
 .../spark/streaming/StreamingContext.scala      |   5 +-
 .../spark/streaming/InputStreamsSuite.scala     |  21 +--
 4 files changed, 53 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d0ecca60/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e4d8389..17194b9 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -961,12 +961,11 @@ class SparkContext(config: SparkConf) extends Logging {
       classOf[LongWritable],
       classOf[BytesWritable],
       conf = conf)
-    val data = br.map { case (k, v) =>
-      val bytes = v.getBytes
+    br.map { case (k, v) =>
+      val bytes = v.copyBytes()
       assert(bytes.length == recordLength, "Byte array does not have correct length")
       bytes
     }
-    data
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d0ecca60/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 6538507..a2d3177 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark
 
 import java.io._
+import java.nio.ByteBuffer
 import java.util.zip.GZIPOutputStream
 
 import scala.io.Source
@@ -30,7 +31,6 @@ import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat}
 import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
 
-import org.apache.spark.input.PortableDataStream
 import org.apache.spark.internal.config.IGNORE_CORRUPT_FILES
 import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
 import org.apache.spark.storage.StorageLevel
@@ -237,24 +237,26 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
     assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
   }
 
-  test("binary file input as byte array") {
-    sc = new SparkContext("local", "test")
+  private def writeBinaryData(testOutput: Array[Byte], testOutputCopies: Int): File = {
     val outFile = new File(tempDir, "record-bytestream-00000.bin")
-    val outFileName = outFile.getAbsolutePath()
-
-    // create file
-    val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
-    val bbuf = java.nio.ByteBuffer.wrap(testOutput)
-    // write data to file
-    val file = new java.io.FileOutputStream(outFile)
+    val file = new FileOutputStream(outFile)
     val channel = file.getChannel
-    channel.write(bbuf)
+    for (i <- 0 until testOutputCopies) {
+      // Shift values by i so that they're different in the output
+      val alteredOutput = testOutput.map(b => (b + i).toByte)
+      channel.write(ByteBuffer.wrap(alteredOutput))
+    }
     channel.close()
     file.close()
+    outFile
+  }
 
-    val inRdd = sc.binaryFiles(outFileName)
-    val (infile: String, indata: PortableDataStream) = inRdd.collect.head
-
+  test("binary file input as byte array") {
+    sc = new SparkContext("local", "test")
+    val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
+    val outFile = writeBinaryData(testOutput, 1)
+    val inRdd = sc.binaryFiles(outFile.getAbsolutePath)
+    val (infile, indata) = inRdd.collect().head
     // Make sure the name and array match
     assert(infile.contains(outFile.toURI.getPath)) // a prefix may get added
     assert(indata.toArray === testOutput)
@@ -262,159 +264,55 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
 
   test("portabledatastream caching tests") {
     sc = new SparkContext("local", "test")
-    val outFile = new File(tempDir, "record-bytestream-00000.bin")
-    val outFileName = outFile.getAbsolutePath()
-
-    // create file
     val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
-    val bbuf = java.nio.ByteBuffer.wrap(testOutput)
-    // write data to file
-    val file = new java.io.FileOutputStream(outFile)
-    val channel = file.getChannel
-    channel.write(bbuf)
-    channel.close()
-    file.close()
-
-    val inRdd = sc.binaryFiles(outFileName).cache()
-    inRdd.foreach{
-      curData: (String, PortableDataStream) =>
-       curData._2.toArray() // force the file to read
-    }
-    val mappedRdd = inRdd.map {
-      curData: (String, PortableDataStream) =>
-        (curData._2.getPath(), curData._2)
-    }
-    val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head
-
+    val outFile = writeBinaryData(testOutput, 1)
+    val inRdd = sc.binaryFiles(outFile.getAbsolutePath).cache()
+    inRdd.foreach(_._2.toArray()) // force the file to read
     // Try reading the output back as an object file
-
-    assert(indata.toArray === testOutput)
+    assert(inRdd.values.collect().head.toArray === testOutput)
   }
 
   test("portabledatastream persist disk storage") {
     sc = new SparkContext("local", "test")
-    val outFile = new File(tempDir, "record-bytestream-00000.bin")
-    val outFileName = outFile.getAbsolutePath()
-
-    // create file
     val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
-    val bbuf = java.nio.ByteBuffer.wrap(testOutput)
-    // write data to file
-    val file = new java.io.FileOutputStream(outFile)
-    val channel = file.getChannel
-    channel.write(bbuf)
-    channel.close()
-    file.close()
-
-    val inRdd = sc.binaryFiles(outFileName).persist(StorageLevel.DISK_ONLY)
-    inRdd.foreach{
-      curData: (String, PortableDataStream) =>
-        curData._2.toArray() // force the file to read
-    }
-    val mappedRdd = inRdd.map {
-      curData: (String, PortableDataStream) =>
-        (curData._2.getPath(), curData._2)
-    }
-    val (infile: String, indata: PortableDataStream) = mappedRdd.collect.head
-
-    // Try reading the output back as an object file
-
-    assert(indata.toArray === testOutput)
+    val outFile = writeBinaryData(testOutput, 1)
+    val inRdd = sc.binaryFiles(outFile.getAbsolutePath).persist(StorageLevel.DISK_ONLY)
+    inRdd.foreach(_._2.toArray()) // force the file to read
+    assert(inRdd.values.collect().head.toArray === testOutput)
   }
 
   test("portabledatastream flatmap tests") {
     sc = new SparkContext("local", "test")
-    val outFile = new File(tempDir, "record-bytestream-00000.bin")
-    val outFileName = outFile.getAbsolutePath()
-
-    // create file
     val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
+    val outFile = writeBinaryData(testOutput, 1)
+    val inRdd = sc.binaryFiles(outFile.getAbsolutePath)
     val numOfCopies = 3
-    val bbuf = java.nio.ByteBuffer.wrap(testOutput)
-    // write data to file
-    val file = new java.io.FileOutputStream(outFile)
-    val channel = file.getChannel
-    channel.write(bbuf)
-    channel.close()
-    file.close()
-
-    val inRdd = sc.binaryFiles(outFileName)
-    val mappedRdd = inRdd.map {
-      curData: (String, PortableDataStream) =>
-        (curData._2.getPath(), curData._2)
-    }
-    val copyRdd = mappedRdd.flatMap {
-      curData: (String, PortableDataStream) =>
-        for (i <- 1 to numOfCopies) yield (i, curData._2)
-    }
-
-    val copyArr: Array[(Int, PortableDataStream)] = copyRdd.collect()
-
-    // Try reading the output back as an object file
+    val copyRdd = inRdd.flatMap(curData => (0 until numOfCopies).map(_ => curData._2))
+    val copyArr = copyRdd.collect()
     assert(copyArr.length == numOfCopies)
-    copyArr.foreach{
-      cEntry: (Int, PortableDataStream) =>
-        assert(cEntry._2.toArray === testOutput)
+    for (i <- copyArr.indices) {
+      assert(copyArr(i).toArray === testOutput)
     }
-
   }
 
   test("fixed record length binary file as byte array") {
-    // a fixed length of 6 bytes
-
     sc = new SparkContext("local", "test")
-
-    val outFile = new File(tempDir, "record-bytestream-00000.bin")
-    val outFileName = outFile.getAbsolutePath()
-
-    // create file
     val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
     val testOutputCopies = 10
-
-    // write data to file
-    val file = new java.io.FileOutputStream(outFile)
-    val channel = file.getChannel
-    for(i <- 1 to testOutputCopies) {
-      val bbuf = java.nio.ByteBuffer.wrap(testOutput)
-      channel.write(bbuf)
-    }
-    channel.close()
-    file.close()
-
-    val inRdd = sc.binaryRecords(outFileName, testOutput.length)
-    // make sure there are enough elements
+    val outFile = writeBinaryData(testOutput, testOutputCopies)
+    val inRdd = sc.binaryRecords(outFile.getAbsolutePath, testOutput.length)
     assert(inRdd.count == testOutputCopies)
-
-    // now just compare the first one
-    val indata: Array[Byte] = inRdd.collect.head
-    assert(indata === testOutput)
+    val inArr = inRdd.collect()
+    for (i <- inArr.indices) {
+      assert(inArr(i) === testOutput.map(b => (b + i).toByte))
+    }
   }
 
   test ("negative binary record length should raise an exception") {
-    // a fixed length of 6 bytes
     sc = new SparkContext("local", "test")
-
-    val outFile = new File(tempDir, "record-bytestream-00000.bin")
-    val outFileName = outFile.getAbsolutePath()
-
-    // create file
-    val testOutput = Array[Byte](1, 2, 3, 4, 5, 6)
-    val testOutputCopies = 10
-
-    // write data to file
-    val file = new java.io.FileOutputStream(outFile)
-    val channel = file.getChannel
-    for(i <- 1 to testOutputCopies) {
-      val bbuf = java.nio.ByteBuffer.wrap(testOutput)
-      channel.write(bbuf)
-    }
-    channel.close()
-    file.close()
-
-    val inRdd = sc.binaryRecords(outFileName, -1)
-
+    val outFile = writeBinaryData(Array[Byte](1, 2, 3, 4, 5, 6), 1)
     intercept[SparkException] {
-      inRdd.count
+      sc.binaryRecords(outFile.getAbsolutePath, -1).count()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d0ecca60/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 0a4c141..a34f6c7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -435,13 +435,12 @@ class StreamingContext private[streaming] (
     conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
     val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
       directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true, conf)
-    val data = br.map { case (k, v) =>
-      val bytes = v.getBytes
+    br.map { case (k, v) =>
+      val bytes = v.copyBytes()
       require(bytes.length == recordLength, "Byte array does not have correct length. " +
         s"${bytes.length} did not equal recordLength: $recordLength")
       bytes
     }
-    data
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d0ecca60/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 6fb50a4..b5d36a3 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -84,7 +84,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
 
           // Verify whether all the elements received are as expected
           // (whether the elements were received one in each interval is not verified)
-          val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
+          val output = outputQueue.asScala.flatten.toArray
           assert(output.length === expectedOutput.size)
           for (i <- output.indices) {
             assert(output(i) === expectedOutput(i))
@@ -155,14 +155,15 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
         // not enough to trigger a batch
         clock.advance(batchDuration.milliseconds / 2)
 
-        val input = Seq(1, 2, 3, 4, 5)
-        input.foreach { i =>
+        val numCopies = 3
+        val input = Array[Byte](1, 2, 3, 4, 5)
+        for (i <- 0 until numCopies) {
           Thread.sleep(batchDuration.milliseconds)
           val file = new File(testDir, i.toString)
-          Files.write(Array[Byte](i.toByte), file)
+          Files.write(input.map(b => (b + i).toByte), file)
           assert(file.setLastModified(clock.getTimeMillis()))
           assert(file.lastModified === clock.getTimeMillis())
-          logInfo("Created file " + file)
+          logInfo(s"Created file $file")
           // Advance the clock after creating the file to avoid a race when
           // setting its modification time
           clock.advance(batchDuration.milliseconds)
@@ -170,10 +171,10 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
             assert(batchCounter.getNumCompletedBatches === i)
           }
         }
-
-        val expectedOutput = input.map(i => i.toByte)
-        val obtainedOutput = outputQueue.asScala.flatten.toList.map(i => i(0).toByte)
-        assert(obtainedOutput.toSeq === expectedOutput)
+        val obtainedOutput = outputQueue.asScala.map(_.flatten).toSeq
+        for (i <- obtainedOutput.indices) {
+          assert(obtainedOutput(i) === input.map(b => (b + i).toByte))
+        }
       }
     } finally {
       if (testDir != null) Utils.deleteRecursively(testDir)
@@ -258,7 +259,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
     val testReceiver = new MultiThreadTestReceiver(numThreads, numRecordsPerThread)
     MultiThreadTestReceiver.haveAllThreadsFinished = false
     val outputQueue = new ConcurrentLinkedQueue[Seq[Long]]
-    def output: Iterable[Long] = outputQueue.asScala.flatMap(x => x)
+    def output: Iterable[Long] = outputQueue.asScala.flatten
 
     // set up the network stream using the test receiver
     withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org