You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/10/30 05:22:17 UTC
(spark) branch master updated: [SPARK-45664][SQL] Introduce a mapper for orc compression codecs
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f12bc05e440 [SPARK-45664][SQL] Introduce a mapper for orc compression codecs
f12bc05e440 is described below
commit f12bc05e44099f24c470466bf777473744ab893d
Author: Jiaan Geng <be...@163.com>
AuthorDate: Sun Oct 29 22:22:07 2023 -0700
[SPARK-45664][SQL] Introduce a mapper for orc compression codecs
### What changes were proposed in this pull request?
Currently, Spark supported all the orc compression codecs, but the orc supported compression codecs and spark supported are not completely one-on-one due to Spark introduce two compression codecs `NONE` and `UNCOMPRESSED`.
On the other hand, there are a lot of magic strings copy from orc compression codecs. This issue lead to developers need to manually maintain its consistency. It is easy to make mistakes and reduce development efficiency.
### Why are the changes needed?
Let developers easy to use orc compression codecs.
### Does this PR introduce _any_ user-facing change?
'No'.
Introduce a new class.
### How was this patch tested?
Exists test cases.
### Was this patch authored or co-authored using generative AI tooling?
'No'.
Closes #43528 from beliefer/SPARK-45664.
Authored-by: Jiaan Geng <be...@163.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../datasources/orc/OrcCompressionCodec.java | 56 ++++++++++++++++++++++
.../sql/execution/datasources/orc/OrcOptions.scala | 11 ++---
.../sql/execution/datasources/orc/OrcUtils.scala | 12 ++---
.../BuiltInDataSourceWriteBenchmark.scala | 4 +-
.../benchmark/DataSourceReadBenchmark.scala | 4 +-
.../benchmark/FilterPushdownBenchmark.scala | 3 +-
.../datasources/FileSourceCodecSuite.scala | 5 +-
.../execution/datasources/orc/OrcQuerySuite.scala | 26 +++++-----
.../execution/datasources/orc/OrcSourceSuite.scala | 29 +++++++----
.../spark/sql/hive/CompressionCodecSuite.scala | 23 ++++++---
.../spark/sql/hive/execution/HiveDDLSuite.scala | 7 +--
.../sql/hive/orc/OrcHadoopFsRelationSuite.scala | 6 +--
.../spark/sql/hive/orc/OrcReadBenchmark.scala | 5 +-
13 files changed, 134 insertions(+), 57 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcCompressionCodec.java b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcCompressionCodec.java
new file mode 100644
index 00000000000..c8e57969068
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcCompressionCodec.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.orc.CompressionKind;
+
+/**
+ * A mapper class from Spark supported orc compression codecs to orc compression codecs.
+ */
+public enum OrcCompressionCodec {
+ NONE(CompressionKind.NONE),
+ UNCOMPRESSED(CompressionKind.NONE),
+ ZLIB(CompressionKind.ZLIB),
+ SNAPPY(CompressionKind.SNAPPY),
+ LZO(CompressionKind.LZO),
+ LZ4(CompressionKind.LZ4),
+ ZSTD(CompressionKind.ZSTD);
+
+ private final CompressionKind compressionKind;
+
+ OrcCompressionCodec(CompressionKind compressionKind) {
+ this.compressionKind = compressionKind;
+ }
+
+ public CompressionKind getCompressionKind() {
+ return this.compressionKind;
+ }
+
+ public static final Map<String, String> codecNameMap =
+ Arrays.stream(OrcCompressionCodec.values()).collect(
+ Collectors.toMap(codec -> codec.name(), codec -> codec.name().toLowerCase(Locale.ROOT)));
+
+ public String lowerCaseName() {
+ return codecNameMap.get(this.name());
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala
index 1c819f07038..4bed600fa4e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOptions.scala
@@ -75,14 +75,9 @@ object OrcOptions extends DataSourceOptions {
val COMPRESSION = newOption("compression")
// The ORC compression short names
- private val shortOrcCompressionCodecNames = Map(
- "none" -> "NONE",
- "uncompressed" -> "NONE",
- "snappy" -> "SNAPPY",
- "zlib" -> "ZLIB",
- "lzo" -> "LZO",
- "lz4" -> "LZ4",
- "zstd" -> "ZSTD")
+ private val shortOrcCompressionCodecNames = OrcCompressionCodec.values().map {
+ mapper => mapper.lowerCaseName() -> mapper.getCompressionKind.name()
+ }.toMap
def getORCCompressionCodecName(name: String): String = shortOrcCompressionCodecNames(name)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
index d6d42e74da1..c4490b95e3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
@@ -49,12 +49,12 @@ object OrcUtils extends Logging {
// The extensions for ORC compression codecs
val extensionsForCompressionCodecNames = Map(
- "NONE" -> "",
- "SNAPPY" -> ".snappy",
- "ZLIB" -> ".zlib",
- "ZSTD" -> ".zstd",
- "LZ4" -> ".lz4",
- "LZO" -> ".lzo")
+ OrcCompressionCodec.NONE.name() -> "",
+ OrcCompressionCodec.SNAPPY.name() -> ".snappy",
+ OrcCompressionCodec.ZLIB.name() -> ".zlib",
+ OrcCompressionCodec.ZSTD.name() -> ".zstd",
+ OrcCompressionCodec.LZ4.name() -> ".lz4",
+ OrcCompressionCodec.LZO.name() -> ".lzo")
val CATALYST_TYPE_ATTRIBUTE_NAME = "spark.sql.catalyst.type"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
index ba3228878ec..a8b448c8dc9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/BuiltInDataSourceWriteBenchmark.scala
@@ -21,6 +21,7 @@ import java.util.Locale
import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.ParquetOutputFormat
+import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.internal.SQLConf
@@ -56,7 +57,8 @@ object BuiltInDataSourceWriteBenchmark extends DataSourceWriteBenchmark {
spark.conf.set(SQLConf.PARQUET_COMPRESSION.key,
ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
- spark.conf.set(SQLConf.ORC_COMPRESSION.key, "snappy")
+ spark.conf.set(SQLConf.ORC_COMPRESSION.key,
+ OrcCompressionCodec.SNAPPY.lowerCaseName())
formats.foreach { format =>
runBenchmark(s"$format writer benchmark") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
index a8736c04151..bc271235f9d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceReadBenchmark.scala
@@ -29,6 +29,7 @@ import org.apache.spark.{SparkConf, TestUtils}
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, VectorizedParquetRecordReader}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -116,7 +117,8 @@ object DataSourceReadBenchmark extends SqlBasedBenchmark {
}
private def saveAsOrcTable(df: DataFrameWriter[Row], dir: String): Unit = {
- df.mode("overwrite").option("compression", "snappy").orc(dir)
+ df.mode("overwrite").option("compression",
+ OrcCompressionCodec.SNAPPY.lowerCaseName()).orc(dir)
spark.read.orc(dir).createOrReplaceTempView("orcTable")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
index 10781ec90fa..1a788bf5f2f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
@@ -25,6 +25,7 @@ import scala.util.Random
import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.functions.{monotonically_increasing_id, timestamp_seconds}
import org.apache.spark.sql.internal.SQLConf
@@ -51,7 +52,7 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark {
.set("spark.master", "local[1]")
.setIfMissing("spark.driver.memory", "3g")
.setIfMissing("spark.executor.memory", "3g")
- .setIfMissing("orc.compression", "snappy")
+ .setIfMissing("orc.compression", OrcCompressionCodec.SNAPPY.lowerCaseName())
.setIfMissing("spark.sql.parquet.compression.codec",
ParquetCompressionCodec.SNAPPY.name().toLowerCase(Locale.ROOT))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
index 1f1805a02d7..e4d9e13c2b0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceCodecSuite.scala
@@ -22,6 +22,7 @@ import java.util.Locale
import scala.jdk.CollectionConverters._
import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
import org.apache.spark.sql.execution.datasources.parquet.ParquetCompressionCodec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
@@ -73,6 +74,6 @@ class OrcCodecSuite extends FileSourceCodecSuite {
override def format: String = "orc"
override val codecConfigName: String = SQLConf.ORC_COMPRESSION.key
- override protected def availableCodecs = Seq("none", "uncompressed", "snappy",
- "zlib", "zstd", "lz4", "lzo")
+ override protected def availableCodecs =
+ OrcCompressionCodec.values().map(_.lowerCaseName()).toSeq
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index 39447ed71a8..7d666729bb4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -190,7 +190,7 @@ abstract class OrcQueryTest extends OrcTest {
// Respect `orc.compress` (i.e., OrcConf.COMPRESS).
withTempPath { file =>
spark.range(0, 10).write
- .option(COMPRESS.getAttribute, "ZLIB")
+ .option(COMPRESS.getAttribute, OrcCompressionCodec.ZLIB.name())
.orc(file.getCanonicalPath)
val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".zlib.orc"))
@@ -199,15 +199,15 @@ abstract class OrcQueryTest extends OrcTest {
val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
val conf = OrcFile.readerOptions(new Configuration())
Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader =>
- assert("ZLIB" === reader.getCompressionKind.name)
+ assert(OrcCompressionCodec.ZLIB.name() === reader.getCompressionKind.name)
}
}
// `compression` overrides `orc.compress`.
withTempPath { file =>
spark.range(0, 10).write
- .option("compression", "ZLIB")
- .option(COMPRESS.getAttribute, "SNAPPY")
+ .option("compression", OrcCompressionCodec.ZLIB.name())
+ .option(COMPRESS.getAttribute, OrcCompressionCodec.SNAPPY.name())
.orc(file.getCanonicalPath)
val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".zlib.orc"))
@@ -216,7 +216,7 @@ abstract class OrcQueryTest extends OrcTest {
val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
val conf = OrcFile.readerOptions(new Configuration())
Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader =>
- assert("ZLIB" === reader.getCompressionKind.name)
+ assert(OrcCompressionCodec.ZLIB.name() === reader.getCompressionKind.name)
}
}
}
@@ -224,7 +224,7 @@ abstract class OrcQueryTest extends OrcTest {
test("Compression options for writing to an ORC file (SNAPPY, ZLIB and NONE)") {
withTempPath { file =>
spark.range(0, 10).write
- .option("compression", "ZLIB")
+ .option("compression", OrcCompressionCodec.ZLIB.name())
.orc(file.getCanonicalPath)
val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".zlib.orc"))
@@ -233,13 +233,13 @@ abstract class OrcQueryTest extends OrcTest {
val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
val conf = OrcFile.readerOptions(new Configuration())
Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader =>
- assert("ZLIB" === reader.getCompressionKind.name)
+ assert(OrcCompressionCodec.ZLIB.name() === reader.getCompressionKind.name)
}
}
withTempPath { file =>
spark.range(0, 10).write
- .option("compression", "SNAPPY")
+ .option("compression", OrcCompressionCodec.SNAPPY.name())
.orc(file.getCanonicalPath)
val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".snappy.orc"))
@@ -248,13 +248,13 @@ abstract class OrcQueryTest extends OrcTest {
val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
val conf = OrcFile.readerOptions(new Configuration())
Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader =>
- assert("SNAPPY" === reader.getCompressionKind.name)
+ assert(OrcCompressionCodec.SNAPPY.name() === reader.getCompressionKind.name)
}
}
withTempPath { file =>
spark.range(0, 10).write
- .option("compression", "NONE")
+ .option("compression", OrcCompressionCodec.NONE.name())
.orc(file.getCanonicalPath)
val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".orc"))
@@ -263,7 +263,7 @@ abstract class OrcQueryTest extends OrcTest {
val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
val conf = OrcFile.readerOptions(new Configuration())
Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader =>
- assert("NONE" === reader.getCompressionKind.name)
+ assert(OrcCompressionCodec.NONE.name() === reader.getCompressionKind.name)
}
}
}
@@ -647,7 +647,7 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession {
test("LZO compression options for writing to an ORC file") {
withTempPath { file =>
spark.range(0, 10).write
- .option("compression", "LZO")
+ .option("compression", OrcCompressionCodec.LZO.name())
.orc(file.getCanonicalPath)
val maybeOrcFile = file.listFiles().find(_.getName.endsWith(".lzo.orc"))
@@ -656,7 +656,7 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession {
val orcFilePath = new Path(maybeOrcFile.get.getAbsolutePath)
val conf = OrcFile.readerOptions(new Configuration())
Utils.tryWithResource(OrcFile.createReader(orcFilePath, conf)) { reader =>
- assert("LZO" === reader.getCompressionKind.name)
+ assert(OrcCompressionCodec.LZO.name() === reader.getCompressionKind.name)
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
index 2ed24943908..4abcb4a7ef1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala
@@ -324,29 +324,38 @@ abstract class OrcSuite
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
val conf = spark.sessionState.conf
- val option = new OrcOptions(Map(COMPRESS.getAttribute.toUpperCase(Locale.ROOT) -> "NONE"), conf)
- assert(option.compressionCodec == "NONE")
+ val option = new OrcOptions(
+ Map(COMPRESS.getAttribute.toUpperCase(Locale.ROOT) -> OrcCompressionCodec.NONE.name()), conf)
+ assert(option.compressionCodec == OrcCompressionCodec.NONE.name())
}
test("SPARK-21839: Add SQL config for ORC compression") {
val conf = spark.sessionState.conf
// Test if the default of spark.sql.orc.compression.codec is snappy
- assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "SNAPPY")
+ assert(new OrcOptions(
+ Map.empty[String, String], conf).compressionCodec == OrcCompressionCodec.SNAPPY.name())
// OrcOptions's parameters have a higher priority than SQL configuration.
// `compression` -> `orc.compression` -> `spark.sql.orc.compression.codec`
withSQLConf(SQLConf.ORC_COMPRESSION.key -> "uncompressed") {
- assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == "NONE")
- val map1 = Map(COMPRESS.getAttribute -> "zlib")
- val map2 = Map(COMPRESS.getAttribute -> "zlib", "compression" -> "lzo")
- assert(new OrcOptions(map1, conf).compressionCodec == "ZLIB")
- assert(new OrcOptions(map2, conf).compressionCodec == "LZO")
+ assert(new OrcOptions(
+ Map.empty[String, String], conf).compressionCodec == OrcCompressionCodec.NONE.name())
+ val zlibCodec = OrcCompressionCodec.ZLIB.lowerCaseName()
+ val lzoCodec = OrcCompressionCodec.LZO.lowerCaseName()
+ val map1 = Map(COMPRESS.getAttribute -> zlibCodec)
+ val map2 = Map(COMPRESS.getAttribute -> zlibCodec, "compression" -> lzoCodec)
+ assert(new OrcOptions(map1, conf).compressionCodec == OrcCompressionCodec.ZLIB.name())
+ assert(new OrcOptions(map2, conf).compressionCodec == OrcCompressionCodec.LZO.name())
}
// Test all the valid options of spark.sql.orc.compression.codec
- Seq("NONE", "UNCOMPRESSED", "SNAPPY", "ZLIB", "LZO", "ZSTD", "LZ4").foreach { c =>
+ OrcCompressionCodec.values().map(_.name()).foreach { c =>
withSQLConf(SQLConf.ORC_COMPRESSION.key -> c) {
- val expected = if (c == "UNCOMPRESSED") "NONE" else c
+ val expected = if (c == OrcCompressionCodec.UNCOMPRESSED.name()) {
+ OrcCompressionCodec.NONE.name()
+ } else {
+ c
+ }
assert(new OrcOptions(Map.empty[String, String], conf).compressionCodec == expected)
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
index df28e7b4485..52138ae4558 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala
@@ -23,12 +23,11 @@ import java.util.Locale
import scala.jdk.CollectionConverters._
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.ql.io.orc.CompressionKind
import org.apache.orc.OrcConf.COMPRESS
import org.apache.parquet.hadoop.ParquetOutputFormat
import org.scalatest.BeforeAndAfterAll
-import org.apache.spark.sql.execution.datasources.orc.OrcOptions
+import org.apache.spark.sql.execution.datasources.orc.{OrcCompressionCodec, OrcOptions}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetOptions, ParquetTest}
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -299,9 +298,15 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
ParquetCompressionCodec.SNAPPY.name))
checkForTableWithCompressProp("orc",
tableCompressCodecs =
- List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name),
+ List(
+ OrcCompressionCodec.NONE.name,
+ OrcCompressionCodec.SNAPPY.name,
+ OrcCompressionCodec.ZLIB.name),
sessionCompressCodecs =
- List(CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name, CompressionKind.SNAPPY.name))
+ List(
+ OrcCompressionCodec.SNAPPY.name,
+ OrcCompressionCodec.ZLIB.name,
+ OrcCompressionCodec.SNAPPY.name))
}
test("table-level compression is not set but session-level compressions is set ") {
@@ -314,7 +319,10 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
checkForTableWithCompressProp("orc",
tableCompressCodecs = List.empty,
sessionCompressCodecs =
- List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name))
+ List(
+ OrcCompressionCodec.NONE.name,
+ OrcCompressionCodec.SNAPPY.name,
+ OrcCompressionCodec.ZLIB.name))
}
def checkTableWriteWithCompressionCodecs(format: String, compressCodecs: List[String]): Unit = {
@@ -355,6 +363,9 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
ParquetCompressionCodec.GZIP.name))
checkTableWriteWithCompressionCodecs(
"orc",
- List(CompressionKind.NONE.name, CompressionKind.SNAPPY.name, CompressionKind.ZLIB.name))
+ List(
+ OrcCompressionCodec.NONE.name,
+ OrcCompressionCodec.SNAPPY.name,
+ OrcCompressionCodec.ZLIB.name))
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 78365d25c89..55cbf591303 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
+import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompressionCodec, ParquetFooterReader}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
@@ -2710,7 +2711,7 @@ class HiveDDLSuite
}
Seq(
- ("orc", "ZLIB"),
+ ("orc", OrcCompressionCodec.ZLIB.name()),
("parquet", ParquetCompressionCodec.GZIP.name)).foreach { case (fileFormat, compression) =>
test(s"SPARK-22158 convertMetastore should not ignore table property - $fileFormat") {
withSQLConf(CONVERT_METASTORE_ORC.key -> "true", CONVERT_METASTORE_PARQUET.key -> "true") {
@@ -2768,7 +2769,7 @@ class HiveDDLSuite
assert(DDLUtils.isHiveTable(table))
assert(table.storage.serde.get.contains("orc"))
val properties = table.properties
- assert(properties.get("orc.compress") == Some("ZLIB"))
+ assert(properties.get("orc.compress") == Some(OrcCompressionCodec.ZLIB.name()))
assert(properties.get("orc.compress.size") == Some("1001"))
assert(properties.get("orc.row.index.stride") == Some("2002"))
assert(properties.get("hive.exec.orc.default.block.size") == Some("3003"))
@@ -2780,7 +2781,7 @@ class HiveDDLSuite
val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
Utils.tryWithResource(getReader(maybeFile.head.getCanonicalPath)) { reader =>
- assert(reader.getCompressionKind.name === "ZLIB")
+ assert(reader.getCompressionKind.name === OrcCompressionCodec.ZLIB.name())
assert(reader.getCompressionSize == 1001)
assert(reader.getRowIndexStride == 2002)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index e9b6bd28823..aa2f110ceac 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -20,10 +20,10 @@ package org.apache.spark.sql.hive.orc
import java.io.File
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.ql.io.orc.CompressionKind
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
+import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.HadoopFsRelationTest
import org.apache.spark.sql.types._
@@ -99,7 +99,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
val orcFilePath = maybeOrcFile.get.toPath.toString
val expectedCompressionKind =
OrcFileOperator.getFileReader(orcFilePath).get.getCompression
- assert(CompressionKind.ZLIB.name() === expectedCompressionKind.name())
+ assert(OrcCompressionCodec.ZLIB.name() === expectedCompressionKind.name())
val copyDf = spark
.read
@@ -114,7 +114,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
.orc(file.getCanonicalPath)
val expectedCompressionKind =
OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
- assert(CompressionKind.SNAPPY.name() === expectedCompressionKind.name())
+ assert(OrcCompressionCodec.SNAPPY.name() === expectedCompressionKind.name())
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
index 0330ce51a2a..c6ff7931410 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala
@@ -21,12 +21,11 @@ import java.io.File
import scala.util.Random
-import org.apache.hadoop.hive.ql.io.orc.CompressionKind
-
import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.execution.benchmark.SqlBasedBenchmark
+import org.apache.spark.sql.execution.datasources.orc.OrcCompressionCodec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -48,7 +47,7 @@ object OrcReadBenchmark extends SqlBasedBenchmark {
override def getSparkSession: SparkSession = {
val conf = new SparkConf()
- conf.set("orc.compression", CompressionKind.SNAPPY.name())
+ conf.set("orc.compression", OrcCompressionCodec.SNAPPY.name())
val sparkSession = SparkSession.builder()
.master("local[1]")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org