You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2018/07/19 01:16:22 UTC

spark git commit: [SPARK-24854][SQL] Gathering all Avro options into the AvroOptions class

Repository: spark
Updated Branches:
  refs/heads/master 753f11516 -> cd5d93c0e


[SPARK-24854][SQL] Gathering all Avro options into the AvroOptions class

## What changes were proposed in this pull request?

In the PR, I propose to put all `Avro` options in new class `AvroOptions` in the same way as for other datasources `JSON` and `CSV`.

## How was this patch tested?

It was tested by `AvroSuite`

Author: Maxim Gekk <ma...@databricks.com>

Closes #21810 from MaxGekk/avro-options.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd5d93c0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd5d93c0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd5d93c0

Branch: refs/heads/master
Commit: cd5d93c0e4ec4573126c6cdda3362814976d11eb
Parents: 753f1151
Author: Maxim Gekk <ma...@databricks.com>
Authored: Thu Jul 19 09:16:16 2018 +0800
Committer: hyukjinkwon <gu...@apache.org>
Committed: Thu Jul 19 09:16:16 2018 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/avro/AvroFileFormat.scala  | 13 +++---
 .../org/apache/spark/sql/avro/AvroOptions.scala | 48 ++++++++++++++++++++
 .../org/apache/spark/sql/avro/AvroSuite.scala   |  6 ++-
 3 files changed, 58 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cd5d93c0/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index 9eb2064..1d0f40e 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -58,6 +58,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
       options: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
     val conf = spark.sparkContext.hadoopConfiguration
+    val parsedOptions = new AvroOptions(options)
 
     // Schema evolution is not supported yet. Here we only pick a single random sample file to
     // figure out the schema of the whole dataset.
@@ -76,7 +77,7 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
       }
 
     // User can specify an optional avro json schema.
-    val avroSchema = options.get(AvroFileFormat.AvroSchema)
+    val avroSchema = parsedOptions.schema
       .map(new Schema.Parser().parse)
       .getOrElse {
         val in = new FsInput(sampleFile.getPath, conf)
@@ -114,10 +115,9 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
       job: Job,
       options: Map[String, String],
       dataSchema: StructType): OutputWriterFactory = {
-    val recordName = options.getOrElse("recordName", "topLevelRecord")
-    val recordNamespace = options.getOrElse("recordNamespace", "")
+    val parsedOptions = new AvroOptions(options)
     val outputAvroSchema = SchemaConverters.toAvroType(
-      dataSchema, nullable = false, recordName, recordNamespace)
+      dataSchema, nullable = false, parsedOptions.recordName, parsedOptions.recordNamespace)
 
     AvroJob.setOutputKeySchema(job, outputAvroSchema)
     val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec"
@@ -160,11 +160,12 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
 
     val broadcastedConf =
       spark.sparkContext.broadcast(new AvroFileFormat.SerializableConfiguration(hadoopConf))
+    val parsedOptions = new AvroOptions(options)
 
     (file: PartitionedFile) => {
       val log = LoggerFactory.getLogger(classOf[AvroFileFormat])
       val conf = broadcastedConf.value.value
-      val userProvidedSchema = options.get(AvroFileFormat.AvroSchema).map(new Schema.Parser().parse)
+      val userProvidedSchema = parsedOptions.schema.map(new Schema.Parser().parse)
 
       // TODO Removes this check once `FileFormat` gets a general file filtering interface method.
       // Doing input file filtering is improper because we may generate empty tasks that process no
@@ -235,8 +236,6 @@ private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
 private[avro] object AvroFileFormat {
   val IgnoreFilesWithoutExtensionProperty = "avro.mapred.ignore.inputs.without.extension"
 
-  val AvroSchema = "avroSchema"
-
   class SerializableConfiguration(@transient var value: Configuration)
       extends Serializable with KryoSerializable {
     @transient private[avro] lazy val log = LoggerFactory.getLogger(getClass)

http://git-wip-us.apache.org/repos/asf/spark/blob/cd5d93c0/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
new file mode 100644
index 0000000..8721eae
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+
+/**
+ * Options for Avro Reader and Writer stored in case insensitive manner.
+ */
+class AvroOptions(@transient val parameters: CaseInsensitiveMap[String])
+  extends Logging with Serializable {
+
+  def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
+
+  /**
+   * Optional schema provided by an user in JSON format.
+   */
+  val schema: Option[String] = parameters.get("avroSchema")
+
+  /**
+   * Top level record name in write result, which is required in Avro spec.
+   * See https://avro.apache.org/docs/1.8.2/spec.html#schema_record .
+   * Default value is "topLevelRecord"
+   */
+  val recordName: String = parameters.getOrElse("recordName", "topLevelRecord")
+
+  /**
+   * Record namespace in write result. Default value is "".
+   * See Avro spec for details: https://avro.apache.org/docs/1.8.2/spec.html#schema_record .
+   */
+  val recordNamespace: String = parameters.getOrElse("recordNamespace", "")
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/cd5d93c0/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 446b421..f7e9877 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -578,7 +578,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
       """.stripMargin
     val result = spark
       .read
-      .option(AvroFileFormat.AvroSchema, avroSchema)
+      .option("avroSchema", avroSchema)
       .avro(testAvro)
       .collect()
     val expected = spark.read.avro(testAvro).select("string").collect()
@@ -598,7 +598,9 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
         |  }]
         |}
       """.stripMargin
-    val result = spark.read.option(AvroFileFormat.AvroSchema, avroSchema)
+    val result = spark
+      .read
+      .option("avroSchema", avroSchema)
       .avro(testAvro).select("missingField").first
     assert(result === Row("foo"))
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org