You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2017/08/30 23:17:04 UTC

spark git commit: [SPARK-21839][SQL] Support SQL config for ORC compression

Repository: spark
Updated Branches:
  refs/heads/master 6949a9c5c -> d8f454086


[SPARK-21839][SQL] Support SQL config for ORC compression

## What changes were proposed in this pull request?

This PR aims to support `spark.sql.orc.compression.codec` like Parquet's `spark.sql.parquet.compression.codec`. Users can use SQLConf to control ORC compression, too.

## How was this patch tested?

Pass the Jenkins with new and updated test cases.

Author: Dongjoon Hyun <do...@apache.org>

Closes #19055 from dongjoon-hyun/SPARK-21839.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8f45408
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8f45408
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8f45408

Branch: refs/heads/master
Commit: d8f45408635d4fccac557cb1e877dfe9267fb326
Parents: 6949a9c
Author: Dongjoon Hyun <do...@apache.org>
Authored: Thu Aug 31 08:16:58 2017 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Thu Aug 31 08:16:58 2017 +0900

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py                |  5 ++--
 .../org/apache/spark/sql/internal/SQLConf.scala | 10 +++++++
 .../org/apache/spark/sql/DataFrameWriter.scala  |  8 ++++--
 .../spark/sql/hive/orc/OrcFileFormat.scala      |  2 +-
 .../apache/spark/sql/hive/orc/OrcOptions.scala  | 18 +++++++-----
 .../spark/sql/hive/orc/OrcSourceSuite.scala     | 29 ++++++++++++++++++--
 6 files changed, 57 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d8f45408/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 01da0dc..cb847a0 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -851,8 +851,9 @@ class DataFrameWriter(OptionUtils):
         :param partitionBy: names of partitioning columns
         :param compression: compression codec to use when saving to file. This can be one of the
                             known case-insensitive shorten names (none, snappy, zlib, and lzo).
-                            This will override ``orc.compress``. If None is set, it uses the
-                            default value, ``snappy``.
+                            This will override ``orc.compress`` and
+                            ``spark.sql.orc.compression.codec``. If None is set, it uses the value
+                            specified in ``spark.sql.orc.compression.codec``.
 
         >>> orc_df = spark.read.orc('python/test_support/sql/orc_partitioned')
         >>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))

http://git-wip-us.apache.org/repos/asf/spark/blob/d8f45408/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a685099..c407874 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -322,6 +322,14 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
+    .doc("Sets the compression codec use when writing ORC files. Acceptable values include: " +
+      "none, uncompressed, snappy, zlib, lzo.")
+    .stringConf
+    .transform(_.toLowerCase(Locale.ROOT))
+    .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))
+    .createWithDefault("snappy")
+
   val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
     .doc("When true, enable filter pushdown for ORC files.")
     .booleanConf
@@ -998,6 +1006,8 @@ class SQLConf extends Serializable with Logging {
 
   def useCompression: Boolean = getConf(COMPRESS_CACHED)
 
+  def orcCompressionCodec: String = getConf(ORC_COMPRESSION)
+
   def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)
 
   def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA)

http://git-wip-us.apache.org/repos/asf/spark/blob/d8f45408/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index cca9352..07347d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -517,9 +517,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    *
    * You can set the following ORC-specific option(s) for writing ORC files:
    * <ul>
-   * <li>`compression` (default `snappy`): compression codec to use when saving to file. This can be
-   * one of the known case-insensitive shorten names(`none`, `snappy`, `zlib`, and `lzo`).
-   * This will override `orc.compress`.</li>
+   * <li>`compression` (default is the value specified in `spark.sql.orc.compression.codec`):
+   * compression codec to use when saving to file. This can be one of the known case-insensitive
+   * shorten names(`none`, `snappy`, `zlib`, and `lzo`). This will override
+   * `orc.compress` and `spark.sql.parquet.compression.codec`. If `orc.compress` is given,
+   * it overrides `spark.sql.parquet.compression.codec`.</li>
    * </ul>
    *
    * @since 1.5.0

http://git-wip-us.apache.org/repos/asf/spark/blob/d8f45408/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 3a34ec5..edf2013 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -68,7 +68,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
       job: Job,
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory = {
-    val orcOptions = new OrcOptions(options)
+    val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
 
     val configuration = job.getConfiguration
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d8f45408/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
index 043eb69..7f94c8c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
@@ -20,30 +20,34 @@ package org.apache.spark.sql.hive.orc
 import java.util.Locale
 
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.internal.SQLConf
 
 /**
  * Options for the ORC data source.
  */
-private[orc] class OrcOptions(@transient private val parameters: CaseInsensitiveMap[String])
+private[orc] class OrcOptions(
+    @transient private val parameters: CaseInsensitiveMap[String],
+    @transient private val sqlConf: SQLConf)
   extends Serializable {
 
   import OrcOptions._
 
-  def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
+  def this(parameters: Map[String, String], sqlConf: SQLConf) =
+    this(CaseInsensitiveMap(parameters), sqlConf)
 
   /**
-   * Compression codec to use. By default snappy compression.
+   * Compression codec to use.
    * Acceptable values are defined in [[shortOrcCompressionCodecNames]].
    */
   val compressionCodec: String = {
-    // `orc.compress` is a ORC configuration. So, here we respect this as an option but
-    // `compression` has higher precedence than `orc.compress`. It means if both are set,
-    // we will use `compression`.
+    // `compression`, `orc.compress`, and `spark.sql.orc.compression.codec` are
+    // in order of precedence from highest to lowest.
     val orcCompressionConf = parameters.get(OrcRelation.ORC_COMPRESSION)
     val codecName = parameters
       .get("compression")
       .orElse(orcCompressionConf)
-      .getOrElse("snappy").toLowerCase(Locale.ROOT)
+      .getOrElse(sqlConf.orcCompressionCodec)
+      .toLowerCase(Locale.ROOT)
     if (!shortOrcCompressionCodecNames.contains(codecName)) {
       val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase(Locale.ROOT))
       throw new IllegalArgumentException(s"Codec [$codecName] " +

http://git-wip-us.apache.org/repos/asf/spark/blob/d8f45408/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 52fa401..781de66 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -22,8 +22,8 @@ import java.io.File
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql.{QueryTest, Row}
-import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -149,7 +149,8 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
   }
 
   test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
-    assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE")
+    val conf = sqlContext.sessionState.conf
+    assert(new OrcOptions(Map("Orc.Compress" -> "NONE"), conf).compressionCodec == "NONE")
   }
 
   test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") {
@@ -194,6 +195,30 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
       Utils.deleteRecursively(location)
     }
   }
+
+  test("SPARK-21839: Add SQL config for ORC compression") {
+    val conf = sqlContext.sessionState.conf
+    // Test if the default of spark.sql.orc.compression.codec is snappy
+    assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "SNAPPY")
+
+    // OrcOptions's parameters have a higher priority than SQL configuration.
+    // `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec`
+    withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") {
+      assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE")
+      val map1 = Map("orc.compress" -> "zlib")
+      val map2 = Map("orc.compress" -> "zlib", "compression" -> "lzo")
+      assert(new OrcOptions(map1, conf).compressionCodec == "ZLIB")
+      assert(new OrcOptions(map2, conf).compressionCodec == "LZO")
+    }
+
+    // Test all the valid options of spark.sql.orc.compression.codec
+    Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c =>
+      withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) {
+        val expected = if (c == "UNCOMPRESSED") "NONE" else c
+        assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected)
+      }
+    }
+  }
 }
 
 class OrcSourceSuite extends OrcSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org