You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2024/01/18 16:26:47 UTC

(spark) branch master updated: [SPARK-46759][SQL][AVRO] Codec xz and zstandard support compression level for avro files

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 01bb1b1a3dbf [SPARK-46759][SQL][AVRO] Codec xz and zstandard support compression level for avro files
01bb1b1a3dbf is described below

commit 01bb1b1a3dbfc68f41d9b13de863d26d587c7e2f
Author: Kent Yao <ya...@apache.org>
AuthorDate: Thu Jan 18 08:26:38 2024 -0800

    [SPARK-46759][SQL][AVRO] Codec xz and zstandard support compression level for avro files
    
    ### What changes were proposed in this pull request?
    
    This PR introduces 2 keys in the form of 'spark.sql.avro.$codecName.level' just like the existing 'spark.sql.avro.deflate.level' for standard and xz codec. W/ this patch, users are able to play the trade-off between the speed and compression ratio when they use AVRO compressed by zstd or xz.
    
    ### Why are the changes needed?
    
    Avro supports compression level for deflate, xz and zstd, but we have only supported deflate.
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, new configurations added
    
    ### How was this patch tested?
    
    new tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #44786 from yaooqinn/SPARK-46759.
    
    Authored-by: Kent Yao <ya...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/avro/AvroCompressionCodec.java       | 30 ++++++++++++++++------
 .../org/apache/spark/sql/avro/AvroUtils.scala      | 12 +++++----
 .../org/apache/spark/sql/avro/AvroCodecSuite.scala | 17 ++++++++++++
 docs/sql-data-sources-avro.md                      | 18 +++++++++++++
 .../org/apache/spark/sql/internal/SQLConf.scala    | 20 ++++++++++++---
 5 files changed, 81 insertions(+), 16 deletions(-)

diff --git a/connector/avro/src/main/java/org/apache/spark/sql/avro/AvroCompressionCodec.java b/connector/avro/src/main/java/org/apache/spark/sql/avro/AvroCompressionCodec.java
index 5cfcfffd07a2..c927991425cd 100644
--- a/connector/avro/src/main/java/org/apache/spark/sql/avro/AvroCompressionCodec.java
+++ b/connector/avro/src/main/java/org/apache/spark/sql/avro/AvroCompressionCodec.java
@@ -22,29 +22,43 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.*;
 
 /**
  * A mapper class from Spark supported avro compression codecs to avro compression codecs.
  */
 public enum AvroCompressionCodec {
-  UNCOMPRESSED(DataFileConstants.NULL_CODEC),
-  DEFLATE(DataFileConstants.DEFLATE_CODEC),
-  SNAPPY(DataFileConstants.SNAPPY_CODEC),
-  BZIP2(DataFileConstants.BZIP2_CODEC),
-  XZ(DataFileConstants.XZ_CODEC),
-  ZSTANDARD(DataFileConstants.ZSTANDARD_CODEC);
+  UNCOMPRESSED(DataFileConstants.NULL_CODEC, false, -1),
+  DEFLATE(DataFileConstants.DEFLATE_CODEC, true, CodecFactory.DEFAULT_DEFLATE_LEVEL),
+  SNAPPY(DataFileConstants.SNAPPY_CODEC, false, -1),
+  BZIP2(DataFileConstants.BZIP2_CODEC, false, -1),
+  XZ(DataFileConstants.XZ_CODEC, true, CodecFactory.DEFAULT_XZ_LEVEL),
+  ZSTANDARD(DataFileConstants.ZSTANDARD_CODEC, true, CodecFactory.DEFAULT_ZSTANDARD_LEVEL);
 
   private final String codecName;
+  private final boolean supportCompressionLevel;
+  private final int defaultCompressionLevel;
 
-  AvroCompressionCodec(String codecName) {
+  AvroCompressionCodec(
+      String codecName,
+      boolean supportCompressionLevel, int defaultCompressionLevel) {
     this.codecName = codecName;
+    this.supportCompressionLevel = supportCompressionLevel;
+    this.defaultCompressionLevel = defaultCompressionLevel;
   }
 
   public String getCodecName() {
     return this.codecName;
   }
 
+  public boolean getSupportCompressionLevel() {
+    return this.supportCompressionLevel;
+  }
+
+  public int getDefaultCompressionLevel() {
+    return this.defaultCompressionLevel;
+  }
+
   private static final Map<String, String> codecNameMap =
     Arrays.stream(AvroCompressionCodec.values()).collect(
       Collectors.toMap(codec -> codec.name(), codec -> codec.name().toLowerCase(Locale.ROOT)));
diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index 3910cf540628..d9c88e14d039 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -24,7 +24,7 @@ import scala.jdk.CollectionConverters._
 import org.apache.avro.Schema
 import org.apache.avro.file.{DataFileReader, FileReader}
 import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
-import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
+import org.apache.avro.mapred.FsInput
 import org.apache.avro.mapreduce.AvroJob
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileStatus
@@ -110,10 +110,12 @@ private[sql] object AvroUtils extends Logging {
           case compressed =>
             job.getConfiguration.setBoolean("mapred.output.compress", true)
             job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, compressed.getCodecName)
-            if (compressed == DEFLATE) {
-              val deflateLevel = sqlConf.avroDeflateLevel
-              logInfo(s"Compressing Avro output using the $codecName codec at level $deflateLevel")
-              job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
+            if (compressed.getSupportCompressionLevel) {
+              val level = sqlConf.getConfString(s"spark.sql.avro.$codecName.level",
+                compressed.getDefaultCompressionLevel.toString)
+              logInfo(s"Compressing Avro output using the $codecName codec at level $level")
+              val s = if (compressed == ZSTANDARD) "zstd" else codecName
+              job.getConfiguration.setInt(s"avro.mapred.$s.level", level.toInt)
             } else {
               logInfo(s"Compressing Avro output using the $codecName codec")
             }
diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCodecSuite.scala b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCodecSuite.scala
index 933b3f989ef7..256b608feaa1 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCodecSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroCodecSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.avro
 import java.util.Locale
 
 import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.datasources.FileSourceCodecSuite
 import org.apache.spark.sql.internal.SQLConf
 
@@ -58,4 +59,20 @@ class AvroCodecSuite extends FileSourceCodecSuite {
       parameters = Map("codecName" -> "unsupported")
     )
   }
+
+  test("SPARK-46759: compression level support for zstandard codec") {
+    Seq("9", "1").foreach { level =>
+      withSQLConf(
+        (SQLConf.AVRO_COMPRESSION_CODEC.key -> "zstandard"),
+        (SQLConf.AVRO_ZSTANDARD_LEVEL.key -> level)) {
+        withTable("avro_t") {
+          sql(
+            s"""CREATE TABLE avro_t
+               |USING $format
+               |AS SELECT 1 as id""".stripMargin)
+          checkAnswer(spark.table("avro_t"), Seq(Row(1)))
+        }
+      }
+    }
+  }
 }
diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md
index 73327e88eb3e..e4b4963f7b5f 100644
--- a/docs/sql-data-sources-avro.md
+++ b/docs/sql-data-sources-avro.md
@@ -362,6 +362,24 @@ Configuration of Avro can be done via `spark.conf.set` or by running `SET key=va
     </td>
     <td>2.4.0</td>
   </tr>
+  <tr>
+    <td>spark.sql.avro.xz.level</td>
+    <td>6</td>
+    <td>
+      Compression level for the xz codec used in writing of AVRO files. Valid value must be in
+      the range of from 1 to 9 inclusive. The default value is 6 in the current implementation.
+    </td>
+    <td>4.0.0</td>
+  </tr>
+  <tr>
+    <td>spark.sql.avro.zstandard.level</td>
+    <td>3</td>
+    <td>
+      Compression level for the zstandard codec used in writing of AVRO files.
+      The default value is 3 in the current implementation.
+    </td>
+    <td>4.0.0</td>
+  </tr>
   <tr>
     <td>spark.sql.avro.datetimeRebaseModeInRead</td>
     <td><code>EXCEPTION</code></td>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index eb5233bfb123..61c7b2457b11 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3625,7 +3625,23 @@ object SQLConf {
     .version("2.4.0")
     .intConf
     .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
-    .createWithDefault(Deflater.DEFAULT_COMPRESSION)
+    .createOptional
+
+  val AVRO_XZ_LEVEL = buildConf("spark.sql.avro.zx.level")
+    .doc("Compression level for the xz codec used in writing of AVRO files. " +
+      "Valid value must be in the range of from 1 to 9 inclusive " +
+      "The default value is 6.")
+    .version("4.0.0")
+    .intConf
+    .checkValue(v => v > 0 && v <= 9, "The value must be in the range of from 1 to 9 inclusive.")
+    .createOptional
+
+  val AVRO_ZSTANDARD_LEVEL = buildConf("spark.sql.avro.zstandard.level")
+    .doc("Compression level for the zstandard codec used in writing of AVRO files. " +
+      "The default value is 3.")
+    .version("4.0.0")
+    .intConf
+    .createOptional
 
   val LEGACY_SIZE_OF_NULL = buildConf("spark.sql.legacy.sizeOfNull")
     .internal()
@@ -5421,8 +5437,6 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
 
   def avroCompressionCodec: String = getConf(SQLConf.AVRO_COMPRESSION_CODEC)
 
-  def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL)
-
   def replaceDatabricksSparkAvroEnabled: Boolean =
     getConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org