You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2024/01/18 16:26:47 UTC
(spark) branch master updated: [SPARK-46759][SQL][AVRO] Codec xz and zstandard support compression level for avro files
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 01bb1b1a3dbf [SPARK-46759][SQL][AVRO] Codec xz and zstandard support compression level for avro files
01bb1b1a3dbf is described below
commit 01bb1b1a3dbfc68f41d9b13de863d26d587c7e2f
Author: Kent Yao <ya...@apache.org>
AuthorDate: Thu Jan 18 08:26:38 2024 -0800
[SPARK-46759][SQL][AVRO] Codec xz and zstandard support compression level for avro files
### What changes were proposed in this pull request?
This PR introduces 2 keys in the form of 'spark.sql.avro.$codecName.level' just like the existing 'spark.sql.avro.deflate.level' for standard and xz codec. W/ this patch, users are able to play the trade-off between the speed and compression ratio when they use AVRO compressed by zstd or xz.
### Why are the changes needed?
Avro supports compression level for deflate, xz and zstd, but we have only supported deflate.
### Does this PR introduce _any_ user-facing change?
yes, new configurations added
### How was this patch tested?
new tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #44786 from yaooqinn/SPARK-46759.
Authored-by: Kent Yao <ya...@apache.org>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../spark/sql/avro/AvroCompressionCodec.java | 30 ++++++++++++++++------
.../org/apache/spark/sql/avro/AvroUtils.scala | 12 +++++----
.../org/apache/spark/sql/avro/AvroCodecSuite.scala | 17 ++++++++++++
docs/sql-data-sources-avro.md | 18 +++++++++++++
.../org/apache/spark/sql/internal/SQLConf.scala | 20 ++++++++++++---
5 files changed, 81 insertions(+), 16 deletions(-)
diff --git a/connector/avro/src/main/java/org/apache/spark/sql/avro/AvroCompressionCodec.java b/connector/avro/src/main/java/org/apache/spark/sql/avro/AvroCompressionCodec.java
index 5cfcfffd07a2..c927991425cd 100644
--- a/connector/avro/src/main/java/org/apache/spark/sql/avro/AvroCompressionCodec.java
+++ b/connector/avro/src/main/java/org/apache/spark/sql/avro/AvroCompressionCodec.java
@@ -22,29 +22,43 @@ import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
-import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.*;
/**
* A mapper class from Spark supported avro compression codecs to avro compression codecs.
*/
public enum AvroCompressionCodec {
- UNCOMPRESSED(DataFileConstants.NULL_CODEC),
- DEFLATE(DataFileConstants.DEFLATE_CODEC),
- SNAPPY(DataFileConstants.SNAPPY_CODEC),
- BZIP2(DataFileConstants.BZIP2_CODEC),
- XZ(DataFileConstants.XZ_CODEC),
- ZSTANDARD(DataFileConstants.ZSTANDARD_CODEC);
+ UNCOMPRESSED(DataFileConstants.NULL_CODEC, false, -1),
+ DEFLATE(DataFileConstants.DEFLATE_CODEC, true, CodecFactory.DEFAULT_DEFLATE_LEVEL),
+ SNAPPY(DataFileConstants.SNAPPY_CODEC, false, -1),
+ BZIP2(DataFileConstants.BZIP2_CODEC, false, -1),
+ XZ(DataFileConstants.XZ_CODEC, true, CodecFactory.DEFAULT_XZ_LEVEL),
+ ZSTANDARD(DataFileConstants.ZSTANDARD_CODEC, true, CodecFactory.DEFAULT_ZSTANDARD_LEVEL);
private final String codecName;
+ private final boolean supportCompressionLevel;
+ private final int defaultCompressionLevel;
- AvroCompressionCodec(String codecName) {
+ AvroCompressionCodec(
+ String codecName,
+ boolean supportCompressionLevel, int defaultCompressionLevel) {
this.codecName = codecName;
+ this.supportCompressionLevel = supportCompressionLevel;
+ this.defaultCompressionLevel = defaultCompressionLevel;
}
public String getCodecName() {
return this.codecName;
}
+ public boolean getSupportCompressionLevel() {
+ return this.supportCompressionLevel;
+ }
+
+ public int getDefaultCompressionLevel() {
+ return this.defaultCompressionLevel;
+ }
+
private static final Map<String, String> codecNameMap =
Arrays.stream(AvroCompressionCodec.values()).collect(
Collectors.toMap(codec -> codec.name(), codec -> codec.name().toLowerCase(Locale.ROOT)));
diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index 3910cf540628..d9c88e14d039 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -24,7 +24,7 @@ import scala.jdk.CollectionConverters._
import org.apache.avro.Schema
import org.apache.avro.file.{DataFileReader, FileReader}
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
-import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
+import org.apache.avro.mapred.FsInput
import org.apache.avro.mapreduce.AvroJob
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
@@ -110,10 +110,12 @@ private[sql] object AvroUtils extends Logging {
case compressed =>
job.getConfiguration.setBoolean("mapred.output.compress", true)
job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, compressed.getCodecName)
- if (compressed == DEFLATE) {
- val deflateLevel = sqlConf.avroDeflateLevel
- logInfo(s"Compressing Avro output using the $codecName codec at level $deflateLevel")
- job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
+ if (compressed.getSupportCompressionLevel) {
+ val level = sqlConf.getConfString(s"spark.sql.avro.$codecName.level",
+ compressed.getDefaultCompressionLevel.toString)
+ logInfo(s"Compressing Avro output using the $codecName codec at level $level")
+ val s = if (compressed == ZSTANDARD) "zstd" else codecName
+ job.getConfiguration.setInt(s"avro.mapred.$s.level", level.toInt)
} else {
logInfo(s"Compressing Avro output using the $codecName codec")
}
diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCodecSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCodecSuite.scala
index 933b3f989ef7..256b608feaa1 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCodecSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCodecSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.avro
import java.util.Locale
import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.datasources.FileSourceCodecSuite
import org.apache.spark.sql.internal.SQLConf
@@ -58,4 +59,20 @@ class AvroCodecSuite extends FileSourceCodecSuite {
parameters = Map("codecName" -> "unsupported")
)
}
+
+ test("SPARK-46759: compression level support for zstandard codec") {
+ Seq("9", "1").foreach { level =>
+ withSQLConf(
+ (SQLConf.AVRO_COMPRESSION_CODEC.key -> "zstandard"),
+ (SQLConf.AVRO_ZSTANDARD_LEVEL.key -> level)) {
+ withTable("avro_t") {
+ sql(
+ s"""CREATE TABLE avro_t
+ |USING $format
+ |AS SELECT 1 as id""".stripMargin)
+ checkAnswer(spark.table("avro_t"), Seq(Row(1)))
+ }
+ }
+ }
+ }
}
diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md
index 73327e88eb3e..e4b4963f7b5f 100644
--- a/docs/sql-data-sources-avro.md
+++ b/docs/sql-data-sources-avro.md
@@ -362,6 +362,24 @@ Configuration of Avro can be done via `spark.conf.set` or by running `SET key=va
</td>
<td>2.4.0</td>
</tr>
+ <tr>
+ <td>spark.sql.avro.xz.level</td>
+ <td>6</td>
+ <td>
+ Compression level for the xz codec used in writing of AVRO files. Valid value must be in
+ the range of from 1 to 9 inclusive. The default value is 6 in the current implementation.
+ </td>
+ <td>4.0.0</td>
+ </tr>
+ <tr>
+ <td>spark.sql.avro.zstandard.level</td>
+ <td>3</td>
+ <td>
+ Compression level for the zstandard codec used in writing of AVRO files.
+ The default value is 3 in the current implementation.
+ </td>
+ <td>4.0.0</td>
+ </tr>
<tr>
<td>spark.sql.avro.datetimeRebaseModeInRead</td>
<td><code>EXCEPTION</code></td>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index eb5233bfb123..61c7b2457b11 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3625,7 +3625,23 @@ object SQLConf {
.version("2.4.0")
.intConf
.checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
- .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+ .createOptional
+
+ val AVRO_XZ_LEVEL = buildConf("spark.sql.avro.zx.level")
+ .doc("Compression level for the xz codec used in writing of AVRO files. " +
+ "Valid value must be in the range of from 1 to 9 inclusive " +
+ "The default value is 6.")
+ .version("4.0.0")
+ .intConf
+ .checkValue(v => v > 0 && v <= 9, "The value must be in the range of from 1 to 9 inclusive.")
+ .createOptional
+
+ val AVRO_ZSTANDARD_LEVEL = buildConf("spark.sql.avro.zstandard.level")
+ .doc("Compression level for the zstandard codec used in writing of AVRO files. " +
+ "The default value is 3.")
+ .version("4.0.0")
+ .intConf
+ .createOptional
val LEGACY_SIZE_OF_NULL = buildConf("spark.sql.legacy.sizeOfNull")
.internal()
@@ -5421,8 +5437,6 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
def avroCompressionCodec: String = getConf(SQLConf.AVRO_COMPRESSION_CODEC)
- def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL)
-
def replaceDatabricksSparkAvroEnabled: Boolean =
getConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org