You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by JoshRosen <gi...@git.apache.org> on 2014/12/26 21:32:51 UTC

[GitHub] spark pull request: [SPARK-2759][CORE] Generic Binary File Support...

Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1658#discussion_r22288031
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
    @@ -510,6 +510,52 @@ class SparkContext(config: SparkConf) extends Logging {
           minPartitions).setName(path)
       }
     
    +
    +  /**
    +   * Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
    +   * (useful for binary data)
    +   *
    +   *
    +   * @param minPartitions A suggestion value of the minimal splitting number for input data.
    +   *
    +   * @note Small files are preferred, large file is also allowable, but may cause bad performance.
    +   */
    +  @DeveloperApi
    +  def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
    +  RDD[(String, PortableDataStream)] = {
    +    val job = new NewHadoopJob(hadoopConfiguration)
    +    NewFileInputFormat.addInputPath(job, new Path(path))
    +    val updateConf = job.getConfiguration
    +    new BinaryFileRDD(
    +      this,
    +      classOf[StreamInputFormat],
    +      classOf[String],
    +      classOf[PortableDataStream],
    +      updateConf,
    +      minPartitions).setName(path)
    +  }
    +
    +  /**
    +   * Load data from a flat binary file, assuming each record is a set of numbers
    +   * with the specified numerical format (see ByteBuffer), and the number of
    +   * bytes per record is constant (see FixedLengthBinaryInputFormat)
    +   *
    +   * @param path Directory to the input data files
    +   * @param recordLength The length at which to split the records
    +   * @return An RDD of data with values, RDD[(Array[Byte])]
    +   */
    +  def binaryRecords(path: String, recordLength: Int,
    +                    conf: Configuration = hadoopConfiguration): RDD[Array[Byte]] = {
    +    conf.setInt("recordLength",recordLength)
    +    val br = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path,
    +      classOf[FixedLengthBinaryInputFormat],
    +      classOf[LongWritable],
    +      classOf[BytesWritable],
    +      conf=conf)
    +    val data = br.map{ case (k, v) => v.getBytes}
    --- End diff --
    
    It turns out that `getBytes` returns a padded byte array, so I think  you may need to copy / slice out the subarray with the data using `v.getLength`; see [HADOOP-6298: "BytesWritable#getBytes is a bad name that leads to programming mistakes"](https://issues.apache.org/jira/browse/HADOOP-6298) for more details.
    
    Using `getBytes` without `getLength` has caused bugs in Spark in the past: #2712.
    
    Is the use of `getBytes` in this patch a bug?  Or is it somehow safe due to our use of FixedLengthBinaryInputFormat?  If it is somehow safe, we should have a comment which explains this so that readers who know about the `getBytes` issue aren't confused (or better yet, an `assert` that `getBytes` returns an array of the expected length).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org