You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2017/08/30 23:17:04 UTC
spark git commit: [SPARK-21839][SQL] Support SQL config for ORC
compression
Repository: spark
Updated Branches:
refs/heads/master 6949a9c5c -> d8f454086
[SPARK-21839][SQL] Support SQL config for ORC compression
## What changes were proposed in this pull request?
This PR aims to support `spark.sql.orc.compression.codec` like Parquet's `spark.sql.parquet.compression.codec`. Users can use SQLConf to control ORC compression, too.
## How was this patch tested?
Pass the Jenkins with new and updated test cases.
Author: Dongjoon Hyun <do...@apache.org>
Closes #19055 from dongjoon-hyun/SPARK-21839.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8f45408
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8f45408
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8f45408
Branch: refs/heads/master
Commit: d8f45408635d4fccac557cb1e877dfe9267fb326
Parents: 6949a9c
Author: Dongjoon Hyun <do...@apache.org>
Authored: Thu Aug 31 08:16:58 2017 +0900
Committer: hyukjinkwon <gu...@gmail.com>
Committed: Thu Aug 31 08:16:58 2017 +0900
----------------------------------------------------------------------
python/pyspark/sql/readwriter.py | 5 ++--
.../org/apache/spark/sql/internal/SQLConf.scala | 10 +++++++
.../org/apache/spark/sql/DataFrameWriter.scala | 8 ++++--
.../spark/sql/hive/orc/OrcFileFormat.scala | 2 +-
.../apache/spark/sql/hive/orc/OrcOptions.scala | 18 +++++++-----
.../spark/sql/hive/orc/OrcSourceSuite.scala | 29 ++++++++++++++++++--
6 files changed, 57 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d8f45408/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 01da0dc..cb847a0 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -851,8 +851,9 @@ class DataFrameWriter(OptionUtils):
:param partitionBy: names of partitioning columns
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, snappy, zlib, and lzo).
- This will override ``orc.compress``. If None is set, it uses the
- default value, ``snappy``.
+ This will override ``orc.compress`` and
+ ``spark.sql.orc.compression.codec``. If None is set, it uses the value
+ specified in ``spark.sql.orc.compression.codec``.
>>> orc_df = spark.read.orc('python/test_support/sql/orc_partitioned')
>>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))
http://git-wip-us.apache.org/repos/asf/spark/blob/d8f45408/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a685099..c407874 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -322,6 +322,14 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec")
+ .doc("Sets the compression codec use when writing ORC files. Acceptable values include: " +
+ "none, uncompressed, snappy, zlib, lzo.")
+ .stringConf
+ .transform(_.toLowerCase(Locale.ROOT))
+ .checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo"))
+ .createWithDefault("snappy")
+
val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
.doc("When true, enable filter pushdown for ORC files.")
.booleanConf
@@ -998,6 +1006,8 @@ class SQLConf extends Serializable with Logging {
def useCompression: Boolean = getConf(COMPRESS_CACHED)
+ def orcCompressionCodec: String = getConf(ORC_COMPRESSION)
+
def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)
def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA)
http://git-wip-us.apache.org/repos/asf/spark/blob/d8f45408/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index cca9352..07347d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -517,9 +517,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
*
* You can set the following ORC-specific option(s) for writing ORC files:
* <ul>
- * <li>`compression` (default `snappy`): compression codec to use when saving to file. This can be
- * one of the known case-insensitive shorten names(`none`, `snappy`, `zlib`, and `lzo`).
- * This will override `orc.compress`.</li>
+ * <li>`compression` (default is the value specified in `spark.sql.orc.compression.codec`):
+ * compression codec to use when saving to file. This can be one of the known case-insensitive
+ * shorten names(`none`, `snappy`, `zlib`, and `lzo`). This will override
+ * `orc.compress` and `spark.sql.parquet.compression.codec`. If `orc.compress` is given,
+ * it overrides `spark.sql.parquet.compression.codec`.</li>
* </ul>
*
* @since 1.5.0
http://git-wip-us.apache.org/repos/asf/spark/blob/d8f45408/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 3a34ec5..edf2013 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -68,7 +68,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
- val orcOptions = new OrcOptions(options)
+ val orcOptions = new OrcOptions(options, sparkSession.sessionState.conf)
val configuration = job.getConfiguration
http://git-wip-us.apache.org/repos/asf/spark/blob/d8f45408/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
index 043eb69..7f94c8c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
@@ -20,30 +20,34 @@ package org.apache.spark.sql.hive.orc
import java.util.Locale
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.internal.SQLConf
/**
* Options for the ORC data source.
*/
-private[orc] class OrcOptions(@transient private val parameters: CaseInsensitiveMap[String])
+private[orc] class OrcOptions(
+ @transient private val parameters: CaseInsensitiveMap[String],
+ @transient private val sqlConf: SQLConf)
extends Serializable {
import OrcOptions._
- def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
+ def this(parameters: Map[String, String], sqlConf: SQLConf) =
+ this(CaseInsensitiveMap(parameters), sqlConf)
/**
- * Compression codec to use. By default snappy compression.
+ * Compression codec to use.
* Acceptable values are defined in [[shortOrcCompressionCodecNames]].
*/
val compressionCodec: String = {
- // `orc.compress` is a ORC configuration. So, here we respect this as an option but
- // `compression` has higher precedence than `orc.compress`. It means if both are set,
- // we will use `compression`.
+ // `compression`, `orc.compress`, and `spark.sql.orc.compression.codec` are
+ // in order of precedence from highest to lowest.
val orcCompressionConf = parameters.get(OrcRelation.ORC_COMPRESSION)
val codecName = parameters
.get("compression")
.orElse(orcCompressionConf)
- .getOrElse("snappy").toLowerCase(Locale.ROOT)
+ .getOrElse(sqlConf.orcCompressionCodec)
+ .toLowerCase(Locale.ROOT)
if (!shortOrcCompressionCodecNames.contains(codecName)) {
val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase(Locale.ROOT))
throw new IllegalArgumentException(s"Codec [$codecName] " +
http://git-wip-us.apache.org/repos/asf/spark/blob/d8f45408/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 52fa401..781de66 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -22,8 +22,8 @@ import java.io.File
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.{QueryTest, Row}
-import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -149,7 +149,8 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
}
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
- assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE")
+ val conf = sqlContext.sessionState.conf
+ assert(new OrcOptions(Map("Orc.Compress" -> "NONE"), conf).compressionCodec == "NONE")
}
test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") {
@@ -194,6 +195,30 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
Utils.deleteRecursively(location)
}
}
+
+ test("SPARK-21839: Add SQL config for ORC compression") {
+ val conf = sqlContext.sessionState.conf
+ // Test if the default of spark.sql.orc.compression.codec is snappy
+ assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "SNAPPY")
+
+ // OrcOptions's parameters have a higher priority than SQL configuration.
+ // `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec`
+ withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") {
+ assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE")
+ val map1 = Map("orc.compress" -> "zlib")
+ val map2 = Map("orc.compress" -> "zlib", "compression" -> "lzo")
+ assert(new OrcOptions(map1, conf).compressionCodec == "ZLIB")
+ assert(new OrcOptions(map2, conf).compressionCodec == "LZO")
+ }
+
+ // Test all the valid options of spark.sql.orc.compression.codec
+ Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO").foreach { c =>
+ withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) {
+ val expected = if (c == "UNCOMPRESSED") "NONE" else c
+ assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected)
+ }
+ }
+ }
}
class OrcSourceSuite extends OrcSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org