You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/07/19 01:16:22 UTC
spark git commit: [SPARK-24854][SQL] Gathering all Avro options into
the AvroOptions class
Repository: spark
Updated Branches:
refs/heads/master 753f11516 -> cd5d93c0e
[SPARK-24854][SQL] Gathering all Avro options into the AvroOptions class
## What changes were proposed in this pull request?
In the PR, I propose to put all `Avro` options in new class `AvroOptions` in the same way as for other datasources `JSON` and `CSV`.
## How was this patch tested?
It was tested by `AvroSuite`
Author: Maxim Gekk <ma...@databricks.com>
Closes #21810 from MaxGekk/avro-options.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd5d93c0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd5d93c0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd5d93c0
Branch: refs/heads/master
Commit: cd5d93c0e4ec4573126c6cdda3362814976d11eb
Parents: 753f1151
Author: Maxim Gekk <ma...@databricks.com>
Authored: Thu Jul 19 09:16:16 2018 +0800
Committer: hyukjinkwon <gu...@apache.org>
Committed: Thu Jul 19 09:16:16 2018 +0800
----------------------------------------------------------------------
.../apache/spark/sql/avro/AvroFileFormat.scala | 13 +++---
.../org/apache/spark/sql/avro/AvroOptions.scala | 48 ++++++++++++++++++++
.../org/apache/spark/sql/avro/AvroSuite.scala | 6 ++-
3 files changed, 58 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/cd5d93c0/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index 9eb2064..1d0f40e 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -58,6 +58,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
val conf = spark.sparkContext.hadoopConfiguration
+ val parsedOptions = new AvroOptions(options)
// Schema evolution is not supported yet. Here we only pick a single random sample file to
// figure out the schema of the whole dataset.
@@ -76,7 +77,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
}
// User can specify an optional avro json schema.
- val avroSchema = options.get(AvroFileFormat.AvroSchema)
+ val avroSchema = parsedOptions.schema
.map(new Schema.Parser().parse)
.getOrElse {
val in = new FsInput(sampleFile.getPath, conf)
@@ -114,10 +115,9 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
- val recordName = options.getOrElse("recordName", "topLevelRecord")
- val recordNamespace = options.getOrElse("recordNamespace", "")
+ val parsedOptions = new AvroOptions(options)
val outputAvroSchema = SchemaConverters.toAvroType(
- dataSchema, nullable = false, recordName, recordNamespace)
+ dataSchema, nullable = false, parsedOptions.recordName, parsedOptions.recordNamespace)
AvroJob.setOutputKeySchema(job, outputAvroSchema)
val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec"
@@ -160,11 +160,12 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
val broadcastedConf =
spark.sparkContext.broadcast(new AvroFileFormat.SerializableConfiguration(hadoopConf))
+ val parsedOptions = new AvroOptions(options)
(file: PartitionedFile) => {
val log = LoggerFactory.getLogger(classOf[AvroFileFormat])
val conf = broadcastedConf.value.value
- val userProvidedSchema = options.get(AvroFileFormat.AvroSchema).map(new Schema.Parser().parse)
+ val userProvidedSchema = parsedOptions.schema.map(new Schema.Parser().parse)
// TODO Removes this check once `FileFormat` gets a general file filtering interface method.
// Doing input file filtering is improper because we may generate empty tasks that process no
@@ -235,8 +236,6 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
private[avro] object AvroFileFormat {
val IgnoreFilesWithoutExtensionProperty = "avro.mapred.ignore.inputs.without.extension"
- val AvroSchema = "avroSchema"
-
class SerializableConfiguration(@transient var value: Configuration)
extends Serializable with KryoSerializable {
@transient private[avro] lazy val log = LoggerFactory.getLogger(getClass)
http://git-wip-us.apache.org/repos/asf/spark/blob/cd5d93c0/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
new file mode 100644
index 0000000..8721eae
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+
+/**
+ * Options for Avro Reader and Writer stored in case insensitive manner.
+ */
+class AvroOptions(@transient val parameters: CaseInsensitiveMap[String])
+ extends Logging with Serializable {
+
+ def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
+
+ /**
+ * Optional schema provided by an user in JSON format.
+ */
+ val schema: Option[String] = parameters.get("avroSchema")
+
+ /**
+ * Top level record name in write result, which is required in Avro spec.
+ * See https://avro.apache.org/docs/1.8.2/spec.html#schema_record .
+ * Default value is "topLevelRecord"
+ */
+ val recordName: String = parameters.getOrElse("recordName", "topLevelRecord")
+
+ /**
+ * Record namespace in write result. Default value is "".
+ * See Avro spec for details: https://avro.apache.org/docs/1.8.2/spec.html#schema_record .
+ */
+ val recordNamespace: String = parameters.getOrElse("recordNamespace", "")
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/cd5d93c0/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 446b421..f7e9877 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -578,7 +578,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
""".stripMargin
val result = spark
.read
- .option(AvroFileFormat.AvroSchema, avroSchema)
+ .option("avroSchema", avroSchema)
.avro(testAvro)
.collect()
val expected = spark.read.avro(testAvro).select("string").collect()
@@ -598,7 +598,9 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
| }]
|}
""".stripMargin
- val result = spark.read.option(AvroFileFormat.AvroSchema, avroSchema)
+ val result = spark
+ .read
+ .option("avroSchema", avroSchema)
.avro(testAvro).select("missingField").first
assert(result === Row("foo"))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org