You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/10/30 05:22:17 UTC

(spark) branch master updated: [SPARK-45664][SQL] Introduce a mapper for orc compression codecs

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f12bc05e440 [SPARK-45664][SQL] Introduce a mapper for orc compression codecs
f12bc05e440 is described below

commit f12bc05e44099f24c470466bf777473744ab893d
Author: Jiaan Geng <be...@163.com>
AuthorDate: Sun Oct 29 22:22:07 2023 -0700

    [SPARK-45664][SQL] Introduce a mapper for orc compression codecs
    
    ### What changes were proposed in this pull request?
    Currently, Spark supported all the orc compression codecs, but the orc supported compression codecs and spark supported are not completely one-on-one due to Spark introduce two compression codecs `NONE` and `UNCOMPRESSED`.
    
    On the other hand, there are a lot of magic strings copy from orc compression codecs. This issue lead to developers need to manually maintain its consistency. It is easy to make mistakes and reduce development efficiency.
    
    ### Why are the changes needed?
    Let developers easy to use orc compression codecs.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    Introduce a new class.
    
    ### How was this patch tested?
    Exists test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    'No'.
    
    Closes #43528 from beliefer/SPARK-45664.
    
    Authored-by: Jiaan Geng <be...@163.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../datasources/orc/OrcCompressionCodec.java       | 56 ++++++++++++++++++++++
 .../sql/execution/datasources/orc/OrcOptions.scala | 11 ++---
 .../sql/execution/datasources/orc/OrcUtils.scala   | 12 ++---
 .../BuiltInDataSourceWriteBenchmark.scala          |  4 +-
 .../benchmark/DataSourceReadBenchmark.scala        |  4 +-
 .../benchmark/FilterPushdownBenchmark.scala        |  3 +-
 .../datasources/FileSourceCodecSuite.scala         |  5 +-
 .../execution/datasources/orc/OrcQuerySuite.scala  | 26 +++++-----
 .../execution/datasources/orc/OrcSourceSuite.scala | 29 +++++++----
 .../spark/sql/hive/CompressionCodecSuite.scala     | 23 ++++++---
 .../spark/sql/hive/execution/HiveDDLSuite.scala    |  7 +--
 .../sql/hive/orc/OrcHadoopFsRelationSuite.scala    |  6 +--
 .../spark/sql/hive/orc/OrcReadBenchmark.scala      |  5 +-
 13 files changed, 134 insertions(+), 57 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcCompressionCodec.java b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcCompressionCodec.java
new file mode 100644
index 00000000000..c8e57969068
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcCompressionCodec.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.orc.CompressionKind;
+
+/**
+ * A mapper class from Spark supported orc compression codecs to orc compression codecs.
+ */
+public enum OrcCompressionCodec {
+  NONE(CompressionKind.NONE),
+  UNCOMPRESSED(CompressionKind.NONE),
+  ZLIB(CompressionKind.ZLIB),
+  SNAPPY(CompressionKind.SNAPPY),
+  LZO(CompressionKind.LZO),
+  LZ4(CompressionKind.LZ4),
+  ZSTD(CompressionKind.ZSTD);
+
+  private final CompressionKind compressionKind;
+
+  OrcCompressionCodec(CompressionKind compressionKind) {
+    this.compressionKind = compressionKind;
+  }
+
+  public CompressionKind getCompressionKind() {
+    return this.compressionKind;
+  }
+
+  public static final Map<String, String> codecNameMap =
+    Arrays.stream(OrcCompressionCodec.values()).collect(
+      Collectors.toMap(codec -> codec.name(), codec -> codec.name().toLowerCase(Locale.ROOT)));
+
+  public String lowerCaseName() {
+    return codecNameMap.get(this.name());
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala
index 1c819f07038..4bed600fa4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala
@@ -75,14 +75,9 @@ object OrcOptions extends DataSourceOptions {
   val COMPRESSION = newOption("compression")
 
   // The ORC compression short names
-  private val shortOrcCompressionCodecNames = Map(
-    "none" -> "NONE",
-    "uncompressed" -> "NONE",
-    "snappy" -> "SNAPPY",
-    "zlib" -> "ZLIB",
-    "lzo" -> "LZO",
-    "lz4" -> "LZ4",
-    "zstd" -> "ZSTD")
+  private val shortOrcCompressionCodecNames = OrcCompressionCodec.values().map {
+    mapper => mapper.lowerCaseName() -> mapper.getCompressionKind.name()
+  }.toMap
 
   def getORCCompressionCodecName(name: String): String = shortOrcCompressionCodecNames(name)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index d6d42e74da1..c4490b95e3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -49,12 +49,12 @@ object OrcUtils extends Logging {
 
   // The extensions for ORC compression codecs
   val extensionsForCompressionCodecNames = Map(
-    "NONE" -> "",
-    "SNAPPY" -> ".snappy",
-    "ZLIB" -> ".zlib",
-    "ZSTD" -> ".zstd",
-    "LZ4" -> ".lz4",
-    "LZO" -> ".lzo")
+    OrcCompressionCodec.NONE.name() -> "",
+    OrcCompressionCodec.SNAPPY.name() -> ".snappy",
+    OrcCompressionCodec.ZLIB.name() -> ".zlib",
+    OrcCompressionCodec.ZSTD.name() -> ".zstd",
+    OrcCompressionCodec.LZ4.name() -> ".lz4",
+    OrcCompressionCodec.LZO.name() -> ".lzo")
 
   val CATALYST_TYPE_ATTRIBUTE_NAME = "spark.sql.catalyst.type"
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
index ba3228878ec..a8b448c8dc9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
@@ -21,6 +21,7 @@ import java.util.Locale
 import org.apache.parquet.column.ParquetProperties
 import org.apache.parquet.hadoop.ParquetOutputFormat
 
+import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
 import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
 import org.apache.spark.sql.internal.SQLConf
 
@@ -56,7 +57,8 @@ object BuiltInDataSourceWriteBenchmark extends DataSourceWriteBenchmark {
 
     spark.conf.set(SQLConf.PARQUET_COMPRESSION.key,
       ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
-    spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy")
+    spark.conf.set(SQLConf.ORC_COMPRESSION.key,
+      OrcCompressionCodec.SNAPPY.lowerCaseName())
 
     formats.foreach { format =>
       runBenchmark(s"$format writer benchmark") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
index a8736c04151..bc271235f9d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
@@ -29,6 +29,7 @@ import org.apache.spark.{SparkConf, TestUtils}
 import org.apache.spark.benchmark.Benchmark
 import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, VectorizedParquetRecordReader}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -116,7 +117,8 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
   }
 
   private def saveAsOrcTable(df: DataFrameWriter[Row], dir: String): Unit = {
-    df.mode("overwrite").option("compression", "snappy").orc(dir)
+    df.mode("overwrite").option("compression",
+      OrcCompressionCodec.SNAPPY.lowerCaseName()).orc(dir)
     spark.read.orc(dir).createOrReplaceTempView("orcTable")
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
index 10781ec90fa..1a788bf5f2f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
@@ -25,6 +25,7 @@ import scala.util.Random
 import org.apache.spark.SparkConf
 import org.apache.spark.benchmark.Benchmark
 import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
 import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
 import org.apache.spark.sql.functions.{monotonically_increasing_id, timestamp_seconds}
 import org.apache.spark.sql.internal.SQLConf
@@ -51,7 +52,7 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark {
       .set("spark.master", "local[1]")
       .setIfMissing("spark.driver.memory", "3g")
       .setIfMissing("spark.executor.memory", "3g")
-      .setIfMissing("orc.compression", "snappy")
+      .setIfMissing("orc.compression", OrcCompressionCodec.SNAPPY.lowerCaseName())
       .setIfMissing("spark.sql.parquet.compression.codec",
         ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
index 1f1805a02d7..e4d9e13c2b0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
@@ -22,6 +22,7 @@ import java.util.Locale
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
 import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
@@ -73,6 +74,6 @@ class OrcCodecSuite extends FileSourceCodecSuite {
 
   override def format: String = "orc"
   override val codecConfigName: String = SQLConf.ORC_COMPRESSION.key
-  override protected def availableCodecs = Seq("none", "uncompressed", "snappy",
-    "zlib", "zstd", "lz4", "lzo")
+  override protected def availableCodecs =
+    OrcCompressionCodec.values().map(_.lowerCaseName()).toSeq
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index 39447ed71a8..7d666729bb4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -190,7 +190,7 @@ abstract class OrcQueryTest extends OrcTest {
     // Respect `orc.compress` (i.e., OrcConf.COMPRESS).
     withTempPath { file =>
       spark.range(0, 10).write
-        .option(COMPRESS.getAttribute, "ZLIB")
+        .option(COMPRESS.getAttribute, OrcCompressionCodec.ZLIB.name())
         .orc(file.getCanonicalPath)
 
       val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".zlib.orc"))
@@ -199,15 +199,15 @@ abstract class OrcQueryTest extends OrcTest {
       val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
       val conf = OrcFile.readerOptions(new Configuration())
       Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader =>
-        assert("ZLIB" === reader.getCompressionKind.name)
+        assert(OrcCompressionCodec.ZLIB.name() === reader.getCompressionKind.name)
       }
     }
 
     // `compression` overrides `orc.compress`.
     withTempPath { file =>
       spark.range(0, 10).write
-        .option("compression", "ZLIB")
-        .option(COMPRESS.getAttribute, "SNAPPY")
+        .option("compression", OrcCompressionCodec.ZLIB.name())
+        .option(COMPRESS.getAttribute, OrcCompressionCodec.SNAPPY.name())
         .orc(file.getCanonicalPath)
 
       val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".zlib.orc"))
@@ -216,7 +216,7 @@ abstract class OrcQueryTest extends OrcTest {
       val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
       val conf = OrcFile.readerOptions(new Configuration())
       Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader =>
-        assert("ZLIB" === reader.getCompressionKind.name)
+        assert(OrcCompressionCodec.ZLIB.name() === reader.getCompressionKind.name)
       }
     }
   }
@@ -224,7 +224,7 @@ abstract class OrcQueryTest extends OrcTest {
   test("Compression options for writing to an ORC file (SNAPPY, ZLIB and NONE)") {
     withTempPath { file =>
       spark.range(0, 10).write
-        .option("compression", "ZLIB")
+        .option("compression", OrcCompressionCodec.ZLIB.name())
         .orc(file.getCanonicalPath)
 
       val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".zlib.orc"))
@@ -233,13 +233,13 @@ abstract class OrcQueryTest extends OrcTest {
       val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
       val conf = OrcFile.readerOptions(new Configuration())
       Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader =>
-        assert("ZLIB" === reader.getCompressionKind.name)
+        assert(OrcCompressionCodec.ZLIB.name() === reader.getCompressionKind.name)
       }
     }
 
     withTempPath { file =>
       spark.range(0, 10).write
-        .option("compression", "SNAPPY")
+        .option("compression", OrcCompressionCodec.SNAPPY.name())
         .orc(file.getCanonicalPath)
 
       val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".snappy.orc"))
@@ -248,13 +248,13 @@ abstract class OrcQueryTest extends OrcTest {
       val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
       val conf = OrcFile.readerOptions(new Configuration())
       Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader =>
-        assert("SNAPPY" === reader.getCompressionKind.name)
+        assert(OrcCompressionCodec.SNAPPY.name() === reader.getCompressionKind.name)
       }
     }
 
     withTempPath { file =>
       spark.range(0, 10).write
-        .option("compression", "NONE")
+        .option("compression", OrcCompressionCodec.NONE.name())
         .orc(file.getCanonicalPath)
 
       val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".orc"))
@@ -263,7 +263,7 @@ abstract class OrcQueryTest extends OrcTest {
       val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
       val conf = OrcFile.readerOptions(new Configuration())
       Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader =>
-        assert("NONE" === reader.getCompressionKind.name)
+        assert(OrcCompressionCodec.NONE.name() === reader.getCompressionKind.name)
       }
     }
   }
@@ -647,7 +647,7 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession {
   test("LZO compression options for writing to an ORC file") {
     withTempPath { file =>
       spark.range(0, 10).write
-        .option("compression", "LZO")
+        .option("compression", OrcCompressionCodec.LZO.name())
         .orc(file.getCanonicalPath)
 
       val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".lzo.orc"))
@@ -656,7 +656,7 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession {
       val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
       val conf = OrcFile.readerOptions(new Configuration())
       Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader =>
-        assert("LZO" === reader.getCompressionKind.name)
+        assert(OrcCompressionCodec.LZO.name() === reader.getCompressionKind.name)
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
index 2ed24943908..4abcb4a7ef1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
@@ -324,29 +324,38 @@ abstract class OrcSuite
 
   test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
     val conf = spark.sessionState.conf
-    val option = new OrcOptions(Map(COMPRESS.getAttribute.toUpperCase(Locale.ROOT) -> "NONE"), conf)
-    assert(option.compressionCodec == "NONE")
+    val option = new OrcOptions(
+      Map(COMPRESS.getAttribute.toUpperCase(Locale.ROOT) -> OrcCompressionCodec.NONE.name()), conf)
+    assert(option.compressionCodec == OrcCompressionCodec.NONE.name())
   }
 
   test("SPARK-21839: Add SQL config for ORC compression") {
     val conf = spark.sessionState.conf
     // Test if the default of spark.sql.orc.compression.codec is snappy
-    assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "SNAPPY")
+    assert(new OrcOptions(
+      Map.empty[String, String], conf).compressionCodec == OrcCompressionCodec.SNAPPY.name())
 
     // OrcOptions's parameters have a higher priority than SQL configuration.
     // `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec`
     withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") {
-      assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE")
-      val map1 = Map(COMPRESS.getAttribute -> "zlib")
-      val map2 = Map(COMPRESS.getAttribute -> "zlib", "compression" -> "lzo")
-      assert(new OrcOptions(map1, conf).compressionCodec == "ZLIB")
-      assert(new OrcOptions(map2, conf).compressionCodec == "LZO")
+      assert(new OrcOptions(
+        Map.empty[String, String], conf).compressionCodec == OrcCompressionCodec.NONE.name())
+      val zlibCodec = OrcCompressionCodec.ZLIB.lowerCaseName()
+      val lzoCodec = OrcCompressionCodec.LZO.lowerCaseName()
+      val map1 = Map(COMPRESS.getAttribute -> zlibCodec)
+      val map2 = Map(COMPRESS.getAttribute -> zlibCodec, "compression" -> lzoCodec)
+      assert(new OrcOptions(map1, conf).compressionCodec ==  OrcCompressionCodec.ZLIB.name())
+      assert(new OrcOptions(map2, conf).compressionCodec == OrcCompressionCodec.LZO.name())
     }
 
     // Test all the valid options of spark.sql.orc.compression.codec
-    Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO", "ZSTD", "LZ4").foreach { c =>
+    OrcCompressionCodec.values().map(_.name()).foreach { c =>
       withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) {
-        val expected = if (c == "UNCOMPRESSED") "NONE" else c
+        val expected = if (c == OrcCompressionCodec.UNCOMPRESSED.name()) {
+          OrcCompressionCodec.NONE.name()
+        } else {
+          c
+        }
         assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected)
       }
     }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
index df28e7b4485..52138ae4558 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
@@ -23,12 +23,11 @@ import java.util.Locale
 import scala.jdk.CollectionConverters._
 
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.ql.io.orc.CompressionKind
 import org.apache.orc.OrcConf.COMPRESS
 import org.apache.parquet.hadoop.ParquetOutputFormat
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.orc.{OrcCompressionCodec, OrcOptions}
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetOptions, ParquetTest}
 import org.apache.spark.sql.hive.orc.OrcFileOperator
 import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -299,9 +298,15 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
         ParquetCompressionCodec.SNAPPY.name))
     checkForTableWithCompressProp("orc",
       tableCompressCodecs =
-        List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name),
+        List(
+          OrcCompressionCodec.NONE.name,
+          OrcCompressionCodec.SNAPPY.name,
+          OrcCompressionCodec.ZLIB.name),
       sessionCompressCodecs =
-        List(CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name, CompressionKind.SNAPPY.name))
+        List(
+          OrcCompressionCodec.SNAPPY.name,
+          OrcCompressionCodec.ZLIB.name,
+          OrcCompressionCodec.SNAPPY.name))
   }
 
   test("table-level compression is not set but session-level compressions is set ") {
@@ -314,7 +319,10 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
     checkForTableWithCompressProp("orc",
       tableCompressCodecs = List.empty,
       sessionCompressCodecs =
-        List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name))
+        List(
+          OrcCompressionCodec.NONE.name,
+          OrcCompressionCodec.SNAPPY.name,
+          OrcCompressionCodec.ZLIB.name))
   }
 
   def checkTableWriteWithCompressionCodecs(format: String, compressCodecs: List[String]): Unit = {
@@ -355,6 +363,9 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
         ParquetCompressionCodec.GZIP.name))
     checkTableWriteWithCompressionCodecs(
       "orc",
-      List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name))
+      List(
+        OrcCompressionCodec.NONE.name,
+        OrcCompressionCodec.SNAPPY.name,
+        OrcCompressionCodec.ZLIB.name))
   }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 78365d25c89..55cbf591303 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager
 import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
 import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
+import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetFooterReader}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
@@ -2710,7 +2711,7 @@ class HiveDDLSuite
   }
 
   Seq(
-    ("orc", "ZLIB"),
+    ("orc", OrcCompressionCodec.ZLIB.name()),
     ("parquet", ParquetCompressionCodec.GZIP.name)).foreach { case (fileFormat, compression) =>
     test(s"SPARK-22158 convertMetastore should not ignore table property - $fileFormat") {
       withSQLConf(CONVERT_METASTORE_ORC.key -> "true", CONVERT_METASTORE_PARQUET.key -> "true") {
@@ -2768,7 +2769,7 @@ class HiveDDLSuite
             assert(DDLUtils.isHiveTable(table))
             assert(table.storage.serde.get.contains("orc"))
             val properties = table.properties
-            assert(properties.get("orc.compress") == Some("ZLIB"))
+            assert(properties.get("orc.compress") == Some(OrcCompressionCodec.ZLIB.name()))
             assert(properties.get("orc.compress.size") == Some("1001"))
             assert(properties.get("orc.row.index.stride") == Some("2002"))
             assert(properties.get("hive.exec.orc.default.block.size") == Some("3003"))
@@ -2780,7 +2781,7 @@ class HiveDDLSuite
             val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
 
             Utils.tryWithResource(getReader(maybeFile.head.getCanonicalPath)) { reader =>
-              assert(reader.getCompressionKind.name === "ZLIB")
+              assert(reader.getCompressionKind.name === OrcCompressionCodec.ZLIB.name())
               assert(reader.getCompressionSize == 1001)
               assert(reader.getRowIndexStride == 2002)
             }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index e9b6bd28823..aa2f110ceac 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -20,10 +20,10 @@ package org.apache.spark.sql.hive.orc
 import java.io.File
 
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.ql.io.orc.CompressionKind
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.catalog.CatalogUtils
+import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.HadoopFsRelationTest
 import org.apache.spark.sql.types._
@@ -99,7 +99,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
       val orcFilePath = maybeOrcFile.get.toPath.toString
       val expectedCompressionKind =
         OrcFileOperator.getFileReader(orcFilePath).get.getCompression
-      assert(CompressionKind.ZLIB.name() === expectedCompressionKind.name())
+      assert(OrcCompressionCodec.ZLIB.name() === expectedCompressionKind.name())
 
       val copyDf = spark
         .read
@@ -114,7 +114,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
         .orc(file.getCanonicalPath)
       val expectedCompressionKind =
         OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
-      assert(CompressionKind.SNAPPY.name() === expectedCompressionKind.name())
+      assert(OrcCompressionCodec.SNAPPY.name() === expectedCompressionKind.name())
     }
   }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
index 0330ce51a2a..c6ff7931410 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
@@ -21,12 +21,11 @@ import java.io.File
 
 import scala.util.Random
 
-import org.apache.hadoop.hive.ql.io.orc.CompressionKind
-
 import org.apache.spark.SparkConf
 import org.apache.spark.benchmark.Benchmark
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
+import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
@@ -48,7 +47,7 @@ object OrcReadBenchmark extends SqlBasedBenchmark {
 
   override def getSparkSession: SparkSession = {
     val conf = new SparkConf()
-    conf.set("orc.compression", CompressionKind.SNAPPY.name())
+    conf.set("orc.compression", OrcCompressionCodec.SNAPPY.name())
 
     val sparkSession = SparkSession.builder()
       .master("local[1]")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org