You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/10/16 18:27:10 UTC

spark git commit: [SPARK-22282][SQL] Rename OrcRelation to OrcFileFormat and remove ORC_COMPRESSION

Repository: spark
Updated Branches:
  refs/heads/master 0fa10666c -> 561505e2f


[SPARK-22282][SQL] Rename OrcRelation to OrcFileFormat and remove ORC_COMPRESSION

## What changes were proposed in this pull request?

This PR aims to
- Rename `OrcRelation` to `OrcFileFormat` object.
- Replace `OrcRelation.ORC_COMPRESSION` with `org.apache.orc.OrcConf.COMPRESS`. Since [SPARK-21422](https://issues.apache.org/jira/browse/SPARK-21422), we can use `OrcConf.COMPRESS` instead of Hive's.

```scala
// The references of Hive's classes will be minimized.
val ORC_COMPRESSION = "orc.compress"
```

## How was this patch tested?

Pass the Jenkins with the existing and updated test cases.

Author: Dongjoon Hyun <do...@apache.org>

Closes #19502 from dongjoon-hyun/SPARK-22282.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/561505e2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/561505e2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/561505e2

Branch: refs/heads/master
Commit: 561505e2fc290fc2cee3b8464ec49df773dca5eb
Parents: 0fa1066
Author: Dongjoon Hyun <do...@apache.org>
Authored: Mon Oct 16 11:27:08 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Mon Oct 16 11:27:08 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala    |  4 ++--
 .../apache/spark/sql/hive/orc/OrcFileFormat.scala | 18 ++++++++----------
 .../apache/spark/sql/hive/orc/OrcOptions.scala    |  8 +++++---
 .../apache/spark/sql/hive/orc/OrcQuerySuite.scala | 11 ++++++-----
 .../spark/sql/hive/orc/OrcSourceSuite.scala       |  9 ++++++---
 5 files changed, 27 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/561505e2/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 07347d2..c9e4543 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -520,8 +520,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
    * <li>`compression` (default is the value specified in `spark.sql.orc.compression.codec`):
    * compression codec to use when saving to file. This can be one of the known case-insensitive
    * shorten names(`none`, `snappy`, `zlib`, and `lzo`). This will override
-   * `orc.compress` and `spark.sql.parquet.compression.codec`. If `orc.compress` is given,
-   * it overrides `spark.sql.parquet.compression.codec`.</li>
+   * `orc.compress` and `spark.sql.orc.compression.codec`. If `orc.compress` is given,
+   * it overrides `spark.sql.orc.compression.codec`.</li>
    * </ul>
    *
    * @since 1.5.0

http://git-wip-us.apache.org/repos/asf/spark/blob/561505e2/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 194e69c..d26ec15 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.io.{NullWritable, Writable}
 import org.apache.hadoop.mapred.{JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.orc.OrcConf.COMPRESS
 
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.SparkSession
@@ -72,7 +73,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
 
     val configuration = job.getConfiguration
 
-    configuration.set(OrcRelation.ORC_COMPRESSION, orcOptions.compressionCodec)
+    configuration.set(COMPRESS.getAttribute, orcOptions.compressionCodec)
     configuration match {
       case conf: JobConf =>
         conf.setOutputFormat(classOf[OrcOutputFormat])
@@ -93,8 +94,8 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
 
       override def getFileExtension(context: TaskAttemptContext): String = {
         val compressionExtension: String = {
-          val name = context.getConfiguration.get(OrcRelation.ORC_COMPRESSION)
-          OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "")
+          val name = context.getConfiguration.get(COMPRESS.getAttribute)
+          OrcFileFormat.extensionsForCompressionCodecNames.getOrElse(name, "")
         }
 
         compressionExtension + ".orc"
@@ -120,7 +121,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
     if (sparkSession.sessionState.conf.orcFilterPushDown) {
       // Sets pushed predicates
       OrcFilters.createFilter(requiredSchema, filters.toArray).foreach { f =>
-        hadoopConf.set(OrcRelation.SARG_PUSHDOWN, f.toKryo)
+        hadoopConf.set(OrcFileFormat.SARG_PUSHDOWN, f.toKryo)
         hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
       }
     }
@@ -138,7 +139,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
       if (isEmptyFile) {
         Iterator.empty
       } else {
-        OrcRelation.setRequiredColumns(conf, dataSchema, requiredSchema)
+        OrcFileFormat.setRequiredColumns(conf, dataSchema, requiredSchema)
 
         val orcRecordReader = {
           val job = Job.getInstance(conf)
@@ -160,7 +161,7 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
         Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => recordsIterator.close()))
 
         // Unwraps `OrcStruct`s to `UnsafeRow`s
-        OrcRelation.unwrapOrcStructs(
+        OrcFileFormat.unwrapOrcStructs(
           conf,
           dataSchema,
           requiredSchema,
@@ -255,10 +256,7 @@ private[orc] class OrcOutputWriter(
   }
 }
 
-private[orc] object OrcRelation extends HiveInspectors {
-  // The references of Hive's classes will be minimized.
-  val ORC_COMPRESSION = "orc.compress"
-
+private[orc] object OrcFileFormat extends HiveInspectors {
   // This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public.
   private[orc] val SARG_PUSHDOWN = "sarg.pushdown"
 

http://git-wip-us.apache.org/repos/asf/spark/blob/561505e2/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
index 7f94c8c..6ce90c0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.orc
 
 import java.util.Locale
 
+import org.apache.orc.OrcConf.COMPRESS
+
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.internal.SQLConf
 
@@ -40,9 +42,9 @@ private[orc] class OrcOptions(
    * Acceptable values are defined in [[shortOrcCompressionCodecNames]].
    */
   val compressionCodec: String = {
-    // `compression`, `orc.compress`, and `spark.sql.orc.compression.codec` are
-    // in order of precedence from highest to lowest.
-    val orcCompressionConf = parameters.get(OrcRelation.ORC_COMPRESSION)
+    // `compression`, `orc.compress`(i.e., OrcConf.COMPRESS), and `spark.sql.orc.compression.codec`
+    // are in order of precedence from highest to lowest.
+    val orcCompressionConf = parameters.get(COMPRESS.getAttribute)
     val codecName = parameters
       .get("compression")
       .orElse(orcCompressionConf)

http://git-wip-us.apache.org/repos/asf/spark/blob/561505e2/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 60ccd99..1fa9091 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -22,6 +22,7 @@ import java.sql.Timestamp
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hive.ql.io.orc.{OrcStruct, SparkOrcNewRecordReader}
+import org.apache.orc.OrcConf.COMPRESS
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql._
@@ -176,11 +177,11 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
     }
   }
 
-  test("SPARK-16610: Respect orc.compress option when compression is unset") {
-    // Respect `orc.compress`.
+  test("SPARK-16610: Respect orc.compress (i.e., OrcConf.COMPRESS) when compression is unset") {
+    // Respect `orc.compress` (i.e., OrcConf.COMPRESS).
     withTempPath { file =>
       spark.range(0, 10).write
-        .option("orc.compress", "ZLIB")
+        .option(COMPRESS.getAttribute, "ZLIB")
         .orc(file.getCanonicalPath)
       val expectedCompressionKind =
         OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
@@ -191,7 +192,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
     withTempPath { file =>
       spark.range(0, 10).write
         .option("compression", "ZLIB")
-        .option("orc.compress", "SNAPPY")
+        .option(COMPRESS.getAttribute, "SNAPPY")
         .orc(file.getCanonicalPath)
       val expectedCompressionKind =
         OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
@@ -598,7 +599,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
       val requestedSchema = StructType(Nil)
       val conf = new Configuration()
       val physicalSchema = OrcFileOperator.readSchema(Seq(path), Some(conf)).get
-      OrcRelation.setRequiredColumns(conf, physicalSchema, requestedSchema)
+      OrcFileFormat.setRequiredColumns(conf, physicalSchema, requestedSchema)
       val maybeOrcReader = OrcFileOperator.getFileReader(path, Some(conf))
       assert(maybeOrcReader.isDefined)
       val orcRecordReader = new SparkOrcNewRecordReader(

http://git-wip-us.apache.org/repos/asf/spark/blob/561505e2/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 781de66..ef9e67c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.hive.orc
 
 import java.io.File
+import java.util.Locale
 
+import org.apache.orc.OrcConf.COMPRESS
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql.{QueryTest, Row}
@@ -150,7 +152,8 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
 
   test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
     val conf = sqlContext.sessionState.conf
-    assert(new OrcOptions(Map("Orc.Compress" -> "NONE"), conf).compressionCodec == "NONE")
+    val option = new OrcOptions(Map(COMPRESS.getAttribute.toUpperCase(Locale.ROOT) -> "NONE"), conf)
+    assert(option.compressionCodec == "NONE")
   }
 
   test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") {
@@ -205,8 +208,8 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
     // `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec`
     withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") {
       assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE")
-      val map1 = Map("orc.compress" -> "zlib")
-      val map2 = Map("orc.compress" -> "zlib", "compression" -> "lzo")
+      val map1 = Map(COMPRESS.getAttribute -> "zlib")
+      val map2 = Map(COMPRESS.getAttribute -> "zlib", "compression" -> "lzo")
       assert(new OrcOptions(map1, conf).compressionCodec == "ZLIB")
       assert(new OrcOptions(map2, conf).compressionCodec == "LZO")
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org