You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/07/27 16:11:38 UTC
spark git commit: [SPARK-24881][SQL] New Avro option - compression
Repository: spark
Updated Branches:
refs/heads/master c9bec1d37 -> 0a0f68bae
[SPARK-24881][SQL] New Avro option - compression
## What changes were proposed in this pull request?
In the PR, I added new option for Avro datasource - `compression`. The option allows to specify compression codec for saved Avro files. This option is similar to `compression` option in another datasources like `JSON` and `CSV`.
Also I added the SQL configs `spark.sql.avro.compression.codec` and `spark.sql.avro.deflate.level`. I put the configs into `SQLConf`. If the `compression` option is not specified by an user, the first SQL config is taken into account.
## How was this patch tested?
I added new test which read meta info from written avro files and checks `avro.codec` property.
Author: Maxim Gekk <ma...@databricks.com>
Closes #21837 from MaxGekk/avro-compression.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a0f68ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a0f68ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a0f68ba
Branch: refs/heads/master
Commit: 0a0f68bae6c0a1bf30184b1e9ac6bf3805bd7511
Parents: c9bec1d
Author: Maxim Gekk <ma...@databricks.com>
Authored: Sat Jul 28 00:11:32 2018 +0800
Committer: hyukjinkwon <gu...@apache.org>
Committed: Sat Jul 28 00:11:32 2018 +0800
----------------------------------------------------------------------
.../apache/spark/sql/avro/AvroFileFormat.scala | 7 +---
.../org/apache/spark/sql/avro/AvroOptions.scala | 11 +++++
.../org/apache/spark/sql/avro/AvroSuite.scala | 44 ++++++++++++++++----
.../org/apache/spark/sql/internal/SQLConf.scala | 19 +++++++++
4 files changed, 67 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0a0f68ba/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index c6b3c13..1df1c8b 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -117,11 +117,9 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
dataSchema, nullable = false, parsedOptions.recordName, parsedOptions.recordNamespace)
AvroJob.setOutputKeySchema(job, outputAvroSchema)
- val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec"
- val AVRO_DEFLATE_LEVEL = "spark.sql.avro.deflate.level"
val COMPRESS_KEY = "mapred.output.compress"
- spark.conf.get(AVRO_COMPRESSION_CODEC, "snappy") match {
+ parsedOptions.compression match {
case "uncompressed" =>
log.info("writing uncompressed Avro records")
job.getConfiguration.setBoolean(COMPRESS_KEY, false)
@@ -132,8 +130,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC)
case "deflate" =>
- val deflateLevel = spark.conf.get(
- AVRO_DEFLATE_LEVEL, Deflater.DEFAULT_COMPRESSION.toString).toInt
+ val deflateLevel = spark.sessionState.conf.avroDeflateLevel
log.info(s"compressing Avro output using deflate (level=$deflateLevel)")
job.getConfiguration.setBoolean(COMPRESS_KEY, true)
job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC)
http://git-wip-us.apache.org/repos/asf/spark/blob/0a0f68ba/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
index cd9a911..0f59007 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.internal.SQLConf
/**
* Options for Avro Reader and Writer stored in case insensitive manner.
@@ -68,4 +69,14 @@ class AvroOptions(
.map(_.toBoolean)
.getOrElse(!ignoreFilesWithoutExtension)
}
+
+ /**
+ * The `compression` option allows to specify a compression codec used in write.
+ * Currently supported codecs are `uncompressed`, `snappy` and `deflate`.
+ * If the option is not set, the `spark.sql.avro.compression.codec` config is taken into
+ * account. If the former one is not set too, the `snappy` codec is used by default.
+ */
+ val compression: String = {
+ parameters.get("compression").getOrElse(SQLConf.get.avroCompressionCodec)
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0a0f68ba/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index a93309e..2f478c7 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -27,13 +27,14 @@ import scala.collection.JavaConverters._
import org.apache.avro.Schema
import org.apache.avro.Schema.{Field, Type}
-import org.apache.avro.file.DataFileWriter
-import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
+import org.apache.avro.file.{DataFileReader, DataFileWriter}
+import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
import org.apache.commons.io.FileUtils
import org.apache.spark.sql._
import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
@@ -364,21 +365,19 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}
- test("write with compression") {
+ test("write with compression - sql configs") {
withTempPath { dir =>
- val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec"
- val AVRO_DEFLATE_LEVEL = "spark.sql.avro.deflate.level"
val uncompressDir = s"$dir/uncompress"
val deflateDir = s"$dir/deflate"
val snappyDir = s"$dir/snappy"
val df = spark.read.format("avro").load(testAvro)
- spark.conf.set(AVRO_COMPRESSION_CODEC, "uncompressed")
+ spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "uncompressed")
df.write.format("avro").save(uncompressDir)
- spark.conf.set(AVRO_COMPRESSION_CODEC, "deflate")
- spark.conf.set(AVRO_DEFLATE_LEVEL, "9")
+ spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "deflate")
+ spark.conf.set(SQLConf.AVRO_DEFLATE_LEVEL.key, "9")
df.write.format("avro").save(deflateDir)
- spark.conf.set(AVRO_COMPRESSION_CODEC, "snappy")
+ spark.conf.set(SQLConf.AVRO_COMPRESSION_CODEC.key, "snappy")
df.write.format("avro").save(snappyDir)
val uncompressSize = FileUtils.sizeOfDirectory(new File(uncompressDir))
@@ -890,4 +889,31 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}
}
+
+ test("SPARK-24881: write with compression - avro options") {
+ def getCodec(dir: String): Option[String] = {
+ val files = new File(dir)
+ .listFiles()
+ .filter(_.isFile)
+ .filter(_.getName.endsWith("avro"))
+ files.map { file =>
+ val reader = new DataFileReader(file, new GenericDatumReader[Any]())
+ val r = reader.getMetaString("avro.codec")
+ r
+ }.map(v => if (v == "null") "uncompressed" else v).headOption
+ }
+ def checkCodec(df: DataFrame, dir: String, codec: String): Unit = {
+ val subdir = s"$dir/$codec"
+ df.write.option("compression", codec).format("avro").save(subdir)
+ assert(getCodec(subdir) == Some(codec))
+ }
+ withTempPath { dir =>
+ val path = dir.toString
+ val df = spark.read.format("avro").load(testAvro)
+
+ checkCodec(df, path, "uncompressed")
+ checkCodec(df, path, "deflate")
+ checkCodec(df, path, "snappy")
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/0a0f68ba/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 53423e0..a269e21 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.internal
import java.util.{Locale, NoSuchElementException, Properties, TimeZone}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
+import java.util.zip.Deflater
import scala.collection.JavaConverters._
import scala.collection.immutable
@@ -1434,6 +1435,20 @@ object SQLConf {
"This only takes effect when spark.sql.repl.eagerEval.enabled is set to true.")
.intConf
.createWithDefault(20)
+
+ val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec")
+ .doc("Compression codec used in writing of AVRO files. Default codec is snappy.")
+ .stringConf
+ .checkValues(Set("uncompressed", "deflate", "snappy"))
+ .createWithDefault("snappy")
+
+ val AVRO_DEFLATE_LEVEL = buildConf("spark.sql.avro.deflate.level")
+ .doc("Compression level for the deflate codec used in writing of AVRO files. " +
+ "Valid value must be in the range of from 1 to 9 inclusive or -1. " +
+ "The default value is -1 which corresponds to 6 level in the current implementation.")
+ .intConf
+ .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
+ .createWithDefault(Deflater.DEFAULT_COMPRESSION)
}
/**
@@ -1820,6 +1835,10 @@ class SQLConf extends Serializable with Logging {
def replEagerEvalTruncate: Int = getConf(SQLConf.REPL_EAGER_EVAL_TRUNCATE)
+ def avroCompressionCodec: String = getConf(SQLConf.AVRO_COMPRESSION_CODEC)
+
+ def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL)
+
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org