You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/07/31 01:13:02 UTC

spark git commit: [SPARK-24952][SQL] Support LZMA2 compression by Avro datasource

Repository: spark
Updated Branches:
  refs/heads/master 2fbe294cf -> d20c10fdf


[SPARK-24952][SQL] Support LZMA2 compression by Avro datasource

## What changes were proposed in this pull request?

In the PR, I propose to support `LZMA2` (`XZ`) and `BZIP2` compressions by `AVRO` datasource  in write since the codecs may have better characteristics like compression ratio and speed comparing to already supported `snappy` and `deflate` codecs.

## How was this patch tested?

It was tested manually and by an existing test which was extended to check the `xz` and `bzip2` compressions.

Author: Maxim Gekk <ma...@databricks.com>

Closes #21902 from MaxGekk/avro-xz-bzip2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d20c10fd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d20c10fd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d20c10fd

Branch: refs/heads/master
Commit: d20c10fdf382acf43a7e6a541923bd078e19ca75
Parents: 2fbe294
Author: Maxim Gekk <ma...@databricks.com>
Authored: Tue Jul 31 09:12:57 2018 +0800
Committer: hyukjinkwon <gu...@apache.org>
Committed: Tue Jul 31 09:12:57 2018 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/avro/AvroFileFormat.scala  | 40 +++++++++-----------
 .../org/apache/spark/sql/avro/AvroOptions.scala |  2 +-
 .../org/apache/spark/sql/avro/AvroSuite.scala   | 14 ++++++-
 .../org/apache/spark/sql/internal/SQLConf.scala |  6 ++-
 4 files changed, 36 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d20c10fd/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index e0159b9..7db452b 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -23,7 +23,8 @@ import java.net.URI
 import scala.util.control.NonFatal
 
 import org.apache.avro.Schema
-import org.apache.avro.file.{DataFileConstants, DataFileReader}
+import org.apache.avro.file.DataFileConstants._
+import org.apache.avro.file.DataFileReader
 import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
 import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
 import org.apache.avro.mapreduce.AvroJob
@@ -116,27 +117,22 @@ private[avro] class AvroFileFormat extends FileFormat
       dataSchema, nullable = false, parsedOptions.recordName, parsedOptions.recordNamespace)
 
     AvroJob.setOutputKeySchema(job, outputAvroSchema)
-    val COMPRESS_KEY = "mapred.output.compress"
-
-    parsedOptions.compression match {
-      case "uncompressed" =>
-        logInfo("writing uncompressed Avro records")
-        job.getConfiguration.setBoolean(COMPRESS_KEY, false)
-
-      case "snappy" =>
-        logInfo("compressing Avro output using Snappy")
-        job.getConfiguration.setBoolean(COMPRESS_KEY, true)
-        job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC)
-
-      case "deflate" =>
-        val deflateLevel = spark.sessionState.conf.avroDeflateLevel
-        logInfo(s"compressing Avro output using deflate (level=$deflateLevel)")
-        job.getConfiguration.setBoolean(COMPRESS_KEY, true)
-        job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC)
-        job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
-
-      case unknown: String =>
-        logError(s"unsupported compression codec $unknown")
+
+    if (parsedOptions.compression == "uncompressed") {
+      job.getConfiguration.setBoolean("mapred.output.compress", false)
+    } else {
+      job.getConfiguration.setBoolean("mapred.output.compress", true)
+      logInfo(s"Compressing Avro output using the ${parsedOptions.compression} codec")
+      val codec = parsedOptions.compression match {
+        case DEFLATE_CODEC =>
+          val deflateLevel = spark.sessionState.conf.avroDeflateLevel
+          logInfo(s"Avro compression level $deflateLevel will be used for $DEFLATE_CODEC codec.")
+          job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
+          DEFLATE_CODEC
+        case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC) => codec
+        case unknown => throw new IllegalArgumentException(s"Invalid compression codec: $unknown")
+      }
+      job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, codec)
     }
 
     new AvroOutputWriterFactory(dataSchema, outputAvroSchema.toString)

http://git-wip-us.apache.org/repos/asf/spark/blob/d20c10fd/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
index 0f59007..67f5634 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
@@ -72,7 +72,7 @@ class AvroOptions(
 
   /**
    * The `compression` option allows to specify a compression codec used in write.
-   * Currently supported codecs are `uncompressed`, `snappy` and `deflate`.
+   * Currently supported codecs are `uncompressed`, `snappy`, `deflate`, `bzip2` and `xz`.
    * If the option is not set, the `spark.sql.avro.compression.codec` config is taken into
    * account. If the former one is not set too, the `snappy` codec is used by default.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/d20c10fd/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index f59c2cc..c221c4f 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.avro
 
 import java.io._
 import java.net.URL
-import java.nio.file.{Files, Path, Paths}
+import java.nio.file.{Files, Paths}
 import java.sql.{Date, Timestamp}
 import java.util.{TimeZone, UUID}
 
@@ -368,12 +368,18 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
   test("write with compression - sql configs") {
     withTempPath { dir =>
       val uncompressDir = s"$dir/uncompress"
+      val bzip2Dir = s"$dir/bzip2"
+      val xzDir = s"$dir/xz"
       val deflateDir = s"$dir/deflate"
       val snappyDir = s"$dir/snappy"
 
       val df = spark.read.format("avro").load(testAvro)
       spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "uncompressed")
       df.write.format("avro").save(uncompressDir)
+      spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "bzip2")
+      df.write.format("avro").save(bzip2Dir)
+      spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "xz")
+      df.write.format("avro").save(xzDir)
       spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "deflate")
       spark.conf.set(SQLConf.AVRO_DEFLATE_LEVEL.key, "9")
       df.write.format("avro").save(deflateDir)
@@ -381,11 +387,15 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       df.write.format("avro").save(snappyDir)
 
       val uncompressSize = FileUtils.sizeOfDirectory(new File(uncompressDir))
+      val bzip2Size = FileUtils.sizeOfDirectory(new File(bzip2Dir))
+      val xzSize = FileUtils.sizeOfDirectory(new File(xzDir))
       val deflateSize = FileUtils.sizeOfDirectory(new File(deflateDir))
       val snappySize = FileUtils.sizeOfDirectory(new File(snappyDir))
 
       assert(uncompressSize > deflateSize)
       assert(snappySize > deflateSize)
+      assert(snappySize > bzip2Size)
+      assert(bzip2Size > xzSize)
     }
   }
 
@@ -921,6 +931,8 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       checkCodec(df, path, "uncompressed")
       checkCodec(df, path, "deflate")
       checkCodec(df, path, "snappy")
+      checkCodec(df, path, "bzip2")
+      checkCodec(df, path, "xz")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d20c10fd/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a269e21..edc1a48 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -27,6 +27,7 @@ import scala.collection.immutable
 import scala.util.matching.Regex
 
 import org.apache.hadoop.fs.Path
+import org.tukaani.xz.LZMA2Options
 
 import org.apache.spark.{SparkContext, TaskContext}
 import org.apache.spark.internal.Logging
@@ -1437,9 +1438,10 @@ object SQLConf {
     .createWithDefault(20)
 
   val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec")
-    .doc("Compression codec used in writing of AVRO files. Default codec is snappy.")
+    .doc("Compression codec used in writing of AVRO files. Supported codecs: " +
+      "uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.")
     .stringConf
-    .checkValues(Set("uncompressed", "deflate", "snappy"))
+    .checkValues(Set("uncompressed", "deflate", "snappy", "bzip2", "xz"))
     .createWithDefault("snappy")
 
   val AVRO_DEFLATE_LEVEL = buildConf("spark.sql.avro.deflate.level")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org