You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/07/31 01:13:02 UTC
spark git commit: [SPARK-24952][SQL] Support LZMA2 compression by
Avro datasource
Repository: spark
Updated Branches:
refs/heads/master 2fbe294cf -> d20c10fdf
[SPARK-24952][SQL] Support LZMA2 compression by Avro datasource
## What changes were proposed in this pull request?
In the PR, I propose to support `LZMA2` (`XZ`) and `BZIP2` compressions by `AVRO` datasource in write since the codecs may have better characteristics like compression ratio and speed comparing to already supported `snappy` and `deflate` codecs.
## How was this patch tested?
It was tested manually and by an existing test which was extended to check the `xz` and `bzip2` compressions.
Author: Maxim Gekk <ma...@databricks.com>
Closes #21902 from MaxGekk/avro-xz-bzip2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d20c10fd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d20c10fd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d20c10fd
Branch: refs/heads/master
Commit: d20c10fdf382acf43a7e6a541923bd078e19ca75
Parents: 2fbe294
Author: Maxim Gekk <ma...@databricks.com>
Authored: Tue Jul 31 09:12:57 2018 +0800
Committer: hyukjinkwon <gu...@apache.org>
Committed: Tue Jul 31 09:12:57 2018 +0800
----------------------------------------------------------------------
.../apache/spark/sql/avro/AvroFileFormat.scala | 40 +++++++++-----------
.../org/apache/spark/sql/avro/AvroOptions.scala | 2 +-
.../org/apache/spark/sql/avro/AvroSuite.scala | 14 ++++++-
.../org/apache/spark/sql/internal/SQLConf.scala | 6 ++-
4 files changed, 36 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d20c10fd/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index e0159b9..7db452b 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -23,7 +23,8 @@ import java.net.URI
import scala.util.control.NonFatal
import org.apache.avro.Schema
-import org.apache.avro.file.{DataFileConstants, DataFileReader}
+import org.apache.avro.file.DataFileConstants._
+import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
import org.apache.avro.mapreduce.AvroJob
@@ -116,27 +117,22 @@ private[avro] class AvroFileFormat extends FileFormat
dataSchema, nullable = false, parsedOptions.recordName, parsedOptions.recordNamespace)
AvroJob.setOutputKeySchema(job, outputAvroSchema)
- val COMPRESS_KEY = "mapred.output.compress"
-
- parsedOptions.compression match {
- case "uncompressed" =>
- logInfo("writing uncompressed Avro records")
- job.getConfiguration.setBoolean(COMPRESS_KEY, false)
-
- case "snappy" =>
- logInfo("compressing Avro output using Snappy")
- job.getConfiguration.setBoolean(COMPRESS_KEY, true)
- job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC)
-
- case "deflate" =>
- val deflateLevel = spark.sessionState.conf.avroDeflateLevel
- logInfo(s"compressing Avro output using deflate (level=$deflateLevel)")
- job.getConfiguration.setBoolean(COMPRESS_KEY, true)
- job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC)
- job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
-
- case unknown: String =>
- logError(s"unsupported compression codec $unknown")
+
+ if (parsedOptions.compression == "uncompressed") {
+ job.getConfiguration.setBoolean("mapred.output.compress", false)
+ } else {
+ job.getConfiguration.setBoolean("mapred.output.compress", true)
+ logInfo(s"Compressing Avro output using the ${parsedOptions.compression} codec")
+ val codec = parsedOptions.compression match {
+ case DEFLATE_CODEC =>
+ val deflateLevel = spark.sessionState.conf.avroDeflateLevel
+ logInfo(s"Avro compression level $deflateLevel will be used for $DEFLATE_CODEC codec.")
+ job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
+ DEFLATE_CODEC
+ case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC) => codec
+ case unknown => throw new IllegalArgumentException(s"Invalid compression codec: $unknown")
+ }
+ job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, codec)
}
new AvroOutputWriterFactory(dataSchema, outputAvroSchema.toString)
http://git-wip-us.apache.org/repos/asf/spark/blob/d20c10fd/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
index 0f59007..67f5634 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
@@ -72,7 +72,7 @@ class AvroOptions(
/**
* The `compression` option allows to specify a compression codec used in write.
- * Currently supported codecs are `uncompressed`, `snappy` and `deflate`.
+ * Currently supported codecs are `uncompressed`, `snappy`, `deflate`, `bzip2` and `xz`.
* If the option is not set, the `spark.sql.avro.compression.codec` config is taken into
* account. If the former one is not set too, the `snappy` codec is used by default.
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/d20c10fd/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index f59c2cc..c221c4f 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.avro
import java.io._
import java.net.URL
-import java.nio.file.{Files, Path, Paths}
+import java.nio.file.{Files, Paths}
import java.sql.{Date, Timestamp}
import java.util.{TimeZone, UUID}
@@ -368,12 +368,18 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
test("write with compression - sql configs") {
withTempPath { dir =>
val uncompressDir = s"$dir/uncompress"
+ val bzip2Dir = s"$dir/bzip2"
+ val xzDir = s"$dir/xz"
val deflateDir = s"$dir/deflate"
val snappyDir = s"$dir/snappy"
val df = spark.read.format("avro").load(testAvro)
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "uncompressed")
df.write.format("avro").save(uncompressDir)
+ spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "bzip2")
+ df.write.format("avro").save(bzip2Dir)
+ spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "xz")
+ df.write.format("avro").save(xzDir)
spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "deflate")
spark.conf.set(SQLConf.AVRO_DEFLATE_LEVEL.key, "9")
df.write.format("avro").save(deflateDir)
@@ -381,11 +387,15 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
df.write.format("avro").save(snappyDir)
val uncompressSize = FileUtils.sizeOfDirectory(new File(uncompressDir))
+ val bzip2Size = FileUtils.sizeOfDirectory(new File(bzip2Dir))
+ val xzSize = FileUtils.sizeOfDirectory(new File(xzDir))
val deflateSize = FileUtils.sizeOfDirectory(new File(deflateDir))
val snappySize = FileUtils.sizeOfDirectory(new File(snappyDir))
assert(uncompressSize > deflateSize)
assert(snappySize > deflateSize)
+ assert(snappySize > bzip2Size)
+ assert(bzip2Size > xzSize)
}
}
@@ -921,6 +931,8 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
checkCodec(df, path, "uncompressed")
checkCodec(df, path, "deflate")
checkCodec(df, path, "snappy")
+ checkCodec(df, path, "bzip2")
+ checkCodec(df, path, "xz")
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/d20c10fd/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a269e21..edc1a48 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -27,6 +27,7 @@ import scala.collection.immutable
import scala.util.matching.Regex
import org.apache.hadoop.fs.Path
+import org.tukaani.xz.LZMA2Options
import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.internal.Logging
@@ -1437,9 +1438,10 @@ object SQLConf {
.createWithDefault(20)
val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec")
- .doc("Compression codec used in writing of AVRO files. Default codec is snappy.")
+ .doc("Compression codec used in writing of AVRO files. Supported codecs: " +
+ "uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.")
.stringConf
- .checkValues(Set("uncompressed", "deflate", "snappy"))
+ .checkValues(Set("uncompressed", "deflate", "snappy", "bzip2", "xz"))
.createWithDefault("snappy")
val AVRO_DEFLATE_LEVEL = buildConf("spark.sql.avro.deflate.level")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org