You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/01/23 08:53:13 UTC

spark git commit: [SPARK-12872][SQL] Support to specify the option for compression codec for JSON datasource

Repository: spark
Updated Branches:
  refs/heads/master ea5c38fe7 -> 5af5a0216


[SPARK-12872][SQL] Support to specify the option for compression codec for JSON datasource

https://issues.apache.org/jira/browse/SPARK-12872

This PR makes the JSON datasource can compress output by option instead of manually setting Hadoop configurations.
For reflecting codec by names, it is similar with https://github.com/apache/spark/pull/10805.

As `CSVCompressionCodecs` can be shared with other datasources, it became a separate class to share as `CompressionCodecs`.

Author: hyukjinkwon <gu...@gmail.com>

Closes #10858 from HyukjinKwon/SPARK-12872.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5af5a021
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5af5a021
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5af5a021

Branch: refs/heads/master
Commit: 5af5a02160b42115579003b749c4d1831bf9d48e
Parents: ea5c38f
Author: hyukjinkwon <gu...@gmail.com>
Authored: Fri Jan 22 23:53:12 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Fri Jan 22 23:53:12 2016 -0800

----------------------------------------------------------------------
 .../datasources/CompressionCodecs.scala         | 47 ++++++++++++++++++++
 .../datasources/csv/CSVParameters.scala         | 28 +-----------
 .../datasources/json/JSONOptions.scala          | 12 +++--
 .../datasources/json/JSONRelation.scala         | 10 +++++
 .../execution/datasources/json/JsonSuite.scala  | 28 ++++++++++++
 5 files changed, 96 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5af5a021/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
new file mode 100644
index 0000000..e683a95
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.io.compress.{BZip2Codec, GzipCodec, Lz4Codec, SnappyCodec}
+
+import org.apache.spark.util.Utils
+
+private[datasources] object CompressionCodecs {
+  private val shortCompressionCodecNames = Map(
+    "bzip2" -> classOf[BZip2Codec].getName,
+    "gzip" -> classOf[GzipCodec].getName,
+    "lz4" -> classOf[Lz4Codec].getName,
+    "snappy" -> classOf[SnappyCodec].getName)
+
+  /**
+   * Return the full version of the given codec class.
+   * If it is already a class name, just return it.
+   */
+  def getCodecClassName(name: String): String = {
+    val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name)
+    try {
+      // Validate the codec name
+      Utils.classForName(codecName)
+      codecName
+    } catch {
+      case e: ClassNotFoundException =>
+        throw new IllegalArgumentException(s"Codec [$codecName] " +
+          s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5af5a021/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala
index 676a3d3..0278675 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala
@@ -22,6 +22,7 @@ import java.nio.charset.Charset
 import org.apache.hadoop.io.compress._
 
 import org.apache.spark.Logging
+import org.apache.spark.sql.execution.datasources.CompressionCodecs
 import org.apache.spark.util.Utils
 
 private[sql] case class CSVParameters(@transient parameters: Map[String, String]) extends Logging {
@@ -78,7 +79,7 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String]
 
   val compressionCodec: Option[String] = {
     val name = parameters.get("compression").orElse(parameters.get("codec"))
-    name.map(CSVCompressionCodecs.getCodecClassName)
+    name.map(CompressionCodecs.getCodecClassName)
   }
 
   val maxColumns = 20480
@@ -114,28 +115,3 @@ private[csv] object ParseModes {
     true // We default to permissive is the mode string is not valid
   }
 }
-
-private[csv] object CSVCompressionCodecs {
-  private val shortCompressionCodecNames = Map(
-    "bzip2" -> classOf[BZip2Codec].getName,
-    "gzip" -> classOf[GzipCodec].getName,
-    "lz4" -> classOf[Lz4Codec].getName,
-    "snappy" -> classOf[SnappyCodec].getName)
-
-  /**
-   * Return the full version of the given codec class.
-   * If it is already a class name, just return it.
-   */
-  def getCodecClassName(name: String): String = {
-    val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name)
-    try {
-      // Validate the codec name
-      Utils.classForName(codecName)
-      codecName
-    } catch {
-      case e: ClassNotFoundException =>
-        throw new IllegalArgumentException(s"Codec [$codecName] " +
-          s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/5af5a021/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
index aee9cf2..e74a76c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.json
 
 import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
 
+import org.apache.spark.sql.execution.datasources.CompressionCodecs
+
 /**
  * Options for the JSON data source.
  *
@@ -32,7 +34,8 @@ case class JSONOptions(
     allowSingleQuotes: Boolean = true,
     allowNumericLeadingZeros: Boolean = false,
     allowNonNumericNumbers: Boolean = false,
-    allowBackslashEscapingAnyCharacter: Boolean = false) {
+    allowBackslashEscapingAnyCharacter: Boolean = false,
+    compressionCodec: Option[String] = None) {
 
   /** Sets config options on a Jackson [[JsonFactory]]. */
   def setJacksonOptions(factory: JsonFactory): Unit = {
@@ -46,7 +49,6 @@ case class JSONOptions(
   }
 }
 
-
 object JSONOptions {
   def createFromConfigMap(parameters: Map[String, String]): JSONOptions = JSONOptions(
     samplingRatio =
@@ -64,6 +66,10 @@ object JSONOptions {
     allowNonNumericNumbers =
       parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true),
     allowBackslashEscapingAnyCharacter =
-      parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
+      parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false),
+    compressionCodec = {
+      val name = parameters.get("compression").orElse(parameters.get("codec"))
+      name.map(CompressionCodecs.getCodecClassName)
+    }
   )
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5af5a021/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 31c5620..93727ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonFactory
 import com.google.common.base.Objects
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
+import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
 import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
@@ -162,6 +163,15 @@ private[sql] class JSONRelation(
   }
 
   override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = {
+    val conf = job.getConfiguration
+    options.compressionCodec.foreach { codec =>
+      conf.set("mapreduce.output.fileoutputformat.compress", "true")
+      conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
+      conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
+      conf.set("mapreduce.map.output.compress", "true")
+      conf.set("mapreduce.map.output.compress.codec", codec)
+    }
+
     new BucketedOutputWriterFactory {
       override def newInstance(
           path: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/5af5a021/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index a3c6a1d..d22fa79 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1467,6 +1467,34 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
     }
   }
 
+  test("SPARK-12872 Support to specify the option for compression codec") {
+    withTempDir { dir =>
+      val dir = Utils.createTempDir()
+      dir.delete()
+      val path = dir.getCanonicalPath
+      primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+
+      val jsonDF = sqlContext.read.json(path)
+      val jsonDir = new File(dir, "json").getCanonicalPath
+      jsonDF.coalesce(1).write
+        .format("json")
+        .option("compression", "gZiP")
+        .save(jsonDir)
+
+      val compressedFiles = new File(jsonDir).listFiles()
+      assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+
+      val jsonCopy = sqlContext.read
+        .format("json")
+        .load(jsonDir)
+
+      assert(jsonCopy.count == jsonDF.count)
+      val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+      val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+      checkAnswer(jsonCopySome, jsonDFSome)
+    }
+  }
+
   test("Casting long as timestamp") {
     withTempTable("jsonTable") {
       val schema = (new StructType).add("ts", TimestampType)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org