You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/11/13 00:58:23 UTC

spark git commit: [SPARK-2672] support compressed file in wholeTextFile

Repository: spark
Updated Branches:
  refs/heads/master bd86118c4 -> d7d54a44e


[SPARK-2672] support compressed file in wholeTextFile

The wholeFile() can not read compressed files, it should be, just like textFile().

Author: Davies Liu <da...@databricks.com>

Closes #3005 from davies/whole and squashes the following commits:

a43fcfb [Davies Liu] remove semicolon
c83571a [Davies Liu] remove = if return type is Unit
83c844f [Davies Liu] Merge branch 'master' of github.com:apache/spark into whole
22e8b3e [Davies Liu] support compressed file in wholeTextFile


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d7d54a44
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d7d54a44
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d7d54a44

Branch: refs/heads/master
Commit: d7d54a44e3ada0e50febe64e9b037dc2c8f6ff61
Parents: bd86118
Author: Davies Liu <da...@databricks.com>
Authored: Wed Nov 12 15:58:12 2014 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Wed Nov 12 15:58:12 2014 -0800

----------------------------------------------------------------------
 .../spark/input/WholeTextFileInputFormat.scala  | 20 +++++---
 .../spark/input/WholeTextFileRecordReader.scala | 52 ++++++++++++++++++--
 .../input/WholeTextFileRecordReaderSuite.scala  | 44 +++++++++++++++--
 3 files changed, 103 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d7d54a44/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
index 183bce3..d3601cc 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala
@@ -19,14 +19,13 @@ package org.apache.spark.input
 
 import scala.collection.JavaConversions._
 
+import org.apache.hadoop.conf.{Configuration, Configurable}
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.InputSplit
 import org.apache.hadoop.mapreduce.JobContext
 import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
 import org.apache.hadoop.mapreduce.RecordReader
 import org.apache.hadoop.mapreduce.TaskAttemptContext
-import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
 
 /**
  * A [[org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat CombineFileInputFormat]] for
@@ -34,17 +33,24 @@ import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
  * the value is the entire content of file.
  */
 
-private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[String, String] {
+private[spark] class WholeTextFileInputFormat
+  extends CombineFileInputFormat[String, String] with Configurable {
+
   override protected def isSplitable(context: JobContext, file: Path): Boolean = false
 
+  private var conf: Configuration = _
+  def setConf(c: Configuration) {
+    conf = c
+  }
+  def getConf: Configuration = conf
+
   override def createRecordReader(
       split: InputSplit,
       context: TaskAttemptContext): RecordReader[String, String] = {
 
-    new CombineFileRecordReader[String, String](
-      split.asInstanceOf[CombineFileSplit],
-      context,
-      classOf[WholeTextFileRecordReader])
+    val reader = new WholeCombineFileRecordReader(split, context)
+    reader.setConf(conf)
+    reader
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d54a44/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
index 3564ab2..6d59b24 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
@@ -17,11 +17,13 @@
 
 package org.apache.spark.input
 
+import org.apache.hadoop.conf.{Configuration, Configurable}
 import com.google.common.io.{ByteStreams, Closeables}
 
 import org.apache.hadoop.io.Text
+import org.apache.hadoop.io.compress.CompressionCodecFactory
 import org.apache.hadoop.mapreduce.InputSplit
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
+import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader}
 import org.apache.hadoop.mapreduce.RecordReader
 import org.apache.hadoop.mapreduce.TaskAttemptContext
 
@@ -34,7 +36,13 @@ private[spark] class WholeTextFileRecordReader(
     split: CombineFileSplit,
     context: TaskAttemptContext,
     index: Integer)
-  extends RecordReader[String, String] {
+  extends RecordReader[String, String] with Configurable {
+
+  private var conf: Configuration = _
+  def setConf(c: Configuration) {
+    conf = c
+  }
+  def getConf: Configuration = conf
 
   private[this] val path = split.getPath(index)
   private[this] val fs = path.getFileSystem(context.getConfiguration)
@@ -57,8 +65,16 @@ private[spark] class WholeTextFileRecordReader(
 
   override def nextKeyValue(): Boolean = {
     if (!processed) {
+      val conf = new Configuration
+      val factory = new CompressionCodecFactory(conf)
+      val codec = factory.getCodec(path)  // infers from file ext.
       val fileIn = fs.open(path)
-      val innerBuffer = ByteStreams.toByteArray(fileIn)
+      val innerBuffer = if (codec != null) {
+        ByteStreams.toByteArray(codec.createInputStream(fileIn))
+      } else {
+        ByteStreams.toByteArray(fileIn)
+      }
+
       value = new Text(innerBuffer).toString
       Closeables.close(fileIn, false)
       processed = true
@@ -68,3 +84,33 @@ private[spark] class WholeTextFileRecordReader(
     }
   }
 }
+
+
+/**
+ * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
+ * out in a key-value pair, where the key is the file path and the value is the entire content of
+ * the file.
+ */
+private[spark] class WholeCombineFileRecordReader(
+    split: InputSplit,
+    context: TaskAttemptContext)
+  extends CombineFileRecordReader[String, String](
+    split.asInstanceOf[CombineFileSplit],
+    context,
+    classOf[WholeTextFileRecordReader]
+  ) with Configurable {
+
+  private var conf: Configuration = _
+  def setConf(c: Configuration) {
+    conf = c
+  }
+  def getConf: Configuration = conf
+
+  override def initNextRecordReader(): Boolean = {
+    val r = super.initNextRecordReader()
+    if (r) {
+      this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(conf)
+    }
+    r
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d54a44/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
index 12d1c7b..98b0a16 100644
--- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text
 
 import org.apache.spark.SparkContext
 import org.apache.spark.util.Utils
+import org.apache.hadoop.io.compress.{DefaultCodec, CompressionCodecFactory, GzipCodec}
 
 /**
  * Tests the correctness of
@@ -38,20 +39,32 @@ import org.apache.spark.util.Utils
  */
 class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
   private var sc: SparkContext = _
+  private var factory: CompressionCodecFactory = _
 
   override def beforeAll() {
     sc = new SparkContext("local", "test")
 
     // Set the block size of local file system to test whether files are split right or not.
     sc.hadoopConfiguration.setLong("fs.local.block.size", 32)
+    sc.hadoopConfiguration.set("io.compression.codecs",
+      "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec")
+    factory = new CompressionCodecFactory(sc.hadoopConfiguration)
   }
 
   override def afterAll() {
     sc.stop()
   }
 
-  private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte]) = {
-    val out = new DataOutputStream(new FileOutputStream(s"${inputDir.toString}/$fileName"))
+  private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte],
+                               compress: Boolean) = {
+    val out = if (compress) {
+      val codec = new GzipCodec
+      val path = s"${inputDir.toString}/$fileName${codec.getDefaultExtension}"
+      codec.createOutputStream(new DataOutputStream(new FileOutputStream(path)))
+    } else {
+      val path = s"${inputDir.toString}/$fileName"
+      new DataOutputStream(new FileOutputStream(path))
+    }
     out.write(contents, 0, contents.length)
     out.close()
   }
@@ -68,7 +81,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
     println(s"Local disk address is ${dir.toString}.")
 
     WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
-      createNativeFile(dir, filename, contents)
+      createNativeFile(dir, filename, contents, false)
     }
 
     val res = sc.wholeTextFiles(dir.toString, 3).collect()
@@ -86,6 +99,31 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
 
     Utils.deleteRecursively(dir)
   }
+
+  test("Correctness of WholeTextFileRecordReader with GzipCodec.") {
+    val dir = Utils.createTempDir()
+    println(s"Local disk address is ${dir.toString}.")
+
+    WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
+      createNativeFile(dir, filename, contents, true)
+    }
+
+    val res = sc.wholeTextFiles(dir.toString, 3).collect()
+
+    assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
+      "Number of files read out does not fit with the actual value.")
+
+    for ((filename, contents) <- res) {
+      val shortName = filename.split('/').last.split('.')(0)
+
+      assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
+        s"Missing file name $filename.")
+      assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
+        s"file $filename contents can not match.")
+    }
+
+    Utils.deleteRecursively(dir)
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org