You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/07/12 20:55:31 UTC

[2/2] spark git commit: [SPARK-24768][SQL] Have a built-in AVRO data source implementation

[SPARK-24768][SQL] Have a built-in AVRO data source implementation

## What changes were proposed in this pull request?

Apache Avro (https://avro.apache.org) is a popular data serialization format. It is widely used in the Spark and Hadoop ecosystem, especially for Kafka-based data pipelines.  Using the external package https://github.com/databricks/spark-avro, Spark SQL can read and write the avro data. Making spark-Avro built-in can provide a better experience for first-time users of Spark SQL and structured streaming. We expect the built-in Avro data source can further improve the adoption of structured streaming.
The proposal is to inline code from spark-avro package (https://github.com/databricks/spark-avro). The target release is Spark 2.4.

[Built-in AVRO Data Source In Spark 2.4.pdf](https://github.com/apache/spark/files/2181511/Built-in.AVRO.Data.Source.In.Spark.2.4.pdf)

## How was this patch tested?

Unit test

Author: Gengliang Wang <ge...@databricks.com>

Closes #21742 from gengliangwang/export_avro.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/395860a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/395860a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/395860a9

Branch: refs/heads/master
Commit: 395860a986987886df6d60fd9b26afd818b2cb39
Parents: 1055c94
Author: Gengliang Wang <ge...@databricks.com>
Authored: Thu Jul 12 13:55:25 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Thu Jul 12 13:55:25 2018 -0700

----------------------------------------------------------------------
 dev/run-tests.py                                |   2 +-
 dev/sparktestsupport/modules.py                 |  10 +
 external/avro/pom.xml                           |  73 ++
 ....apache.spark.sql.sources.DataSourceRegister |   1 +
 .../apache/spark/sql/avro/AvroFileFormat.scala  | 289 +++++++
 .../spark/sql/avro/AvroOutputWriter.scala       | 164 ++++
 .../sql/avro/AvroOutputWriterFactory.scala      |  38 +
 .../spark/sql/avro/SchemaConverters.scala       | 406 ++++++++++
 .../org/apache/spark/sql/avro/package.scala     |  39 +
 external/avro/src/test/resources/episodes.avro  | Bin 0 -> 597 bytes
 .../avro/src/test/resources/log4j.properties    |  49 ++
 .../test-random-partitioned/part-r-00000.avro   | Bin 0 -> 1768 bytes
 .../test-random-partitioned/part-r-00001.avro   | Bin 0 -> 2313 bytes
 .../test-random-partitioned/part-r-00002.avro   | Bin 0 -> 1621 bytes
 .../test-random-partitioned/part-r-00003.avro   | Bin 0 -> 2117 bytes
 .../test-random-partitioned/part-r-00004.avro   | Bin 0 -> 3282 bytes
 .../test-random-partitioned/part-r-00005.avro   | Bin 0 -> 1550 bytes
 .../test-random-partitioned/part-r-00006.avro   | Bin 0 -> 1729 bytes
 .../test-random-partitioned/part-r-00007.avro   | Bin 0 -> 1897 bytes
 .../test-random-partitioned/part-r-00008.avro   | Bin 0 -> 3420 bytes
 .../test-random-partitioned/part-r-00009.avro   | Bin 0 -> 1796 bytes
 .../test-random-partitioned/part-r-00010.avro   | Bin 0 -> 3872 bytes
 external/avro/src/test/resources/test.avro      | Bin 0 -> 1365 bytes
 external/avro/src/test/resources/test.avsc      |  53 ++
 external/avro/src/test/resources/test.json      |  42 +
 .../org/apache/spark/sql/avro/AvroSuite.scala   | 812 +++++++++++++++++++
 .../avro/SerializableConfigurationSuite.scala   |  50 ++
 .../org/apache/spark/sql/avro/TestUtils.scala   | 156 ++++
 pom.xml                                         |   1 +
 project/SparkBuild.scala                        |  12 +-
 30 files changed, 2191 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/dev/run-tests.py
----------------------------------------------------------------------
diff --git a/dev/run-tests.py b/dev/run-tests.py
index cd45908..d9d3789 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -110,7 +110,7 @@ def determine_modules_to_test(changed_modules):
     ['graphx', 'examples']
     >>> x = [x.name for x in determine_modules_to_test([modules.sql])]
     >>> x # doctest: +NORMALIZE_WHITESPACE
-    ['sql', 'hive', 'mllib', 'sql-kafka-0-10', 'examples', 'hive-thriftserver',
+    ['sql', 'avro', 'hive', 'mllib', 'sql-kafka-0-10', 'examples', 'hive-thriftserver',
      'pyspark-sql', 'repl', 'sparkr', 'pyspark-mllib', 'pyspark-ml']
     """
     modules_to_test = set()

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index dfea762..2aa3555 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -170,6 +170,16 @@ hive_thriftserver = Module(
     ]
 )
 
+avro = Module(
+    name="avro",
+    dependencies=[sql],
+    source_file_regexes=[
+        "external/avro",
+    ],
+    sbt_test_goals=[
+        "avro/test",
+    ]
+)
 
 sql_kafka = Module(
     name="sql-kafka-0-10",

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/pom.xml
----------------------------------------------------------------------
diff --git a/external/avro/pom.xml b/external/avro/pom.xml
new file mode 100644
index 0000000..42e865b
--- /dev/null
+++ b/external/avro/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.4.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>spark-sql-avro_2.11</artifactId>
+  <properties>
+    <sbt.project.name>avro</sbt.project.name>
+  </properties>
+  <packaging>jar</packaging>
+  <name>Spark Avro</name>
+  <url>http://spark.apache.org/</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-tags_${scala.binary.version}</artifactId>
+    </dependency>
+  </dependencies>
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/external/avro/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/external/avro/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..95835f0
--- /dev/null
+++ b/external/avro/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1 @@
+org.apache.spark.sql.avro.AvroFileFormat

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
new file mode 100755
index 0000000..46e5a18
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import java.io._
+import java.net.URI
+import java.util.zip.Deflater
+
+import scala.util.control.NonFatal
+
+import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
+import com.esotericsoftware.kryo.io.{Input, Output}
+import org.apache.avro.{Schema, SchemaBuilder}
+import org.apache.avro.file.{DataFileConstants, DataFileReader}
+import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
+import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
+import org.apache.avro.mapreduce.AvroJob
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.Job
+import org.slf4j.LoggerFactory
+
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.types.StructType
+
+private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
+  private val log = LoggerFactory.getLogger(getClass)
+
+  override def equals(other: Any): Boolean = other match {
+    case _: AvroFileFormat => true
+    case _ => false
+  }
+
+  // Dummy hashCode() to appease ScalaStyle.
+  override def hashCode(): Int = super.hashCode()
+
+  override def inferSchema(
+      spark: SparkSession,
+      options: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType] = {
+    val conf = spark.sparkContext.hadoopConfiguration
+
+    // Schema evolution is not supported yet. Here we only pick a single random sample file to
+    // figure out the schema of the whole dataset.
+    val sampleFile =
+      if (conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true)) {
+        files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
+          throw new FileNotFoundException(
+            "No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" " +
+              " is set to true. Do all input files have \".avro\" extension?"
+          )
+        }
+      } else {
+        files.headOption.getOrElse {
+          throw new FileNotFoundException("No Avro files found.")
+        }
+      }
+
+    // User can specify an optional avro json schema.
+    val avroSchema = options.get(AvroFileFormat.AvroSchema)
+      .map(new Schema.Parser().parse)
+      .getOrElse {
+        val in = new FsInput(sampleFile.getPath, conf)
+        try {
+          val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]())
+          try {
+            reader.getSchema
+          } finally {
+            reader.close()
+          }
+        } finally {
+          in.close()
+        }
+      }
+
+    SchemaConverters.toSqlType(avroSchema).dataType match {
+      case t: StructType => Some(t)
+      case _ => throw new RuntimeException(
+        s"""Avro schema cannot be converted to a Spark SQL StructType:
+           |
+           |${avroSchema.toString(true)}
+           |""".stripMargin)
+    }
+  }
+
+  override def shortName(): String = "avro"
+
+  override def isSplitable(
+      sparkSession: SparkSession,
+      options: Map[String, String],
+      path: Path): Boolean = true
+
+  override def prepareWrite(
+      spark: SparkSession,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = {
+    val recordName = options.getOrElse("recordName", "topLevelRecord")
+    val recordNamespace = options.getOrElse("recordNamespace", "")
+    val build = SchemaBuilder.record(recordName).namespace(recordNamespace)
+    val outputAvroSchema = SchemaConverters.convertStructToAvro(dataSchema, build, recordNamespace)
+
+    AvroJob.setOutputKeySchema(job, outputAvroSchema)
+    val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec"
+    val AVRO_DEFLATE_LEVEL = "spark.sql.avro.deflate.level"
+    val COMPRESS_KEY = "mapred.output.compress"
+
+    spark.conf.get(AVRO_COMPRESSION_CODEC, "snappy") match {
+      case "uncompressed" =>
+        log.info("writing uncompressed Avro records")
+        job.getConfiguration.setBoolean(COMPRESS_KEY, false)
+
+      case "snappy" =>
+        log.info("compressing Avro output using Snappy")
+        job.getConfiguration.setBoolean(COMPRESS_KEY, true)
+        job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC)
+
+      case "deflate" =>
+        val deflateLevel = spark.conf.get(
+          AVRO_DEFLATE_LEVEL, Deflater.DEFAULT_COMPRESSION.toString).toInt
+        log.info(s"compressing Avro output using deflate (level=$deflateLevel)")
+        job.getConfiguration.setBoolean(COMPRESS_KEY, true)
+        job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC)
+        job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
+
+      case unknown: String =>
+        log.error(s"unsupported compression codec $unknown")
+    }
+
+    new AvroOutputWriterFactory(dataSchema, recordName, recordNamespace)
+  }
+
+  override def buildReader(
+      spark: SparkSession,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      requiredSchema: StructType,
+      filters: Seq[Filter],
+      options: Map[String, String],
+      hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
+
+    val broadcastedConf =
+      spark.sparkContext.broadcast(new AvroFileFormat.SerializableConfiguration(hadoopConf))
+
+    (file: PartitionedFile) => {
+      val log = LoggerFactory.getLogger(classOf[AvroFileFormat])
+      val conf = broadcastedConf.value.value
+      val userProvidedSchema = options.get(AvroFileFormat.AvroSchema).map(new Schema.Parser().parse)
+
+      // TODO Removes this check once `FileFormat` gets a general file filtering interface method.
+      // Doing input file filtering is improper because we may generate empty tasks that process no
+      // input files but stress the scheduler. We should probably add a more general input file
+      // filtering mechanism for `FileFormat` data sources. See SPARK-16317.
+      if (
+        conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true) &&
+        !file.filePath.endsWith(".avro")
+      ) {
+        Iterator.empty
+      } else {
+        val reader = {
+          val in = new FsInput(new Path(new URI(file.filePath)), conf)
+          try {
+            val datumReader = userProvidedSchema match {
+              case Some(userSchema) => new GenericDatumReader[GenericRecord](userSchema)
+              case _ => new GenericDatumReader[GenericRecord]()
+            }
+            DataFileReader.openReader(in, datumReader)
+          } catch {
+            case NonFatal(e) =>
+              log.error("Exception while opening DataFileReader", e)
+              in.close()
+              throw e
+          }
+        }
+
+        // Ensure that the reader is closed even if the task fails or doesn't consume the entire
+        // iterator of records.
+        Option(TaskContext.get()).foreach { taskContext =>
+          taskContext.addTaskCompletionListener { _ =>
+            reader.close()
+          }
+        }
+
+        reader.sync(file.start)
+        val stop = file.start + file.length
+
+        val rowConverter = SchemaConverters.createConverterToSQL(
+          userProvidedSchema.getOrElse(reader.getSchema), requiredSchema)
+
+        new Iterator[InternalRow] {
+          // Used to convert `Row`s containing data columns into `InternalRow`s.
+          private val encoderForDataColumns = RowEncoder(requiredSchema)
+
+          private[this] var completed = false
+
+          override def hasNext: Boolean = {
+            if (completed) {
+              false
+            } else {
+              val r = reader.hasNext && !reader.pastSync(stop)
+              if (!r) {
+                reader.close()
+                completed = true
+              }
+              r
+            }
+          }
+
+          override def next(): InternalRow = {
+            if (reader.pastSync(stop)) {
+              throw new NoSuchElementException("next on empty iterator")
+            }
+            val record = reader.next()
+            val safeDataRow = rowConverter(record).asInstanceOf[GenericRow]
+
+            // The safeDataRow is reused, we must do a copy
+            encoderForDataColumns.toRow(safeDataRow)
+          }
+        }
+      }
+    }
+  }
+}
+
+private[avro] object AvroFileFormat {
+  val IgnoreFilesWithoutExtensionProperty = "avro.mapred.ignore.inputs.without.extension"
+
+  val AvroSchema = "avroSchema"
+
+  class SerializableConfiguration(@transient var value: Configuration)
+      extends Serializable with KryoSerializable {
+    @transient private[avro] lazy val log = LoggerFactory.getLogger(getClass)
+
+    private def writeObject(out: ObjectOutputStream): Unit = tryOrIOException {
+      out.defaultWriteObject()
+      value.write(out)
+    }
+
+    private def readObject(in: ObjectInputStream): Unit = tryOrIOException {
+      value = new Configuration(false)
+      value.readFields(in)
+    }
+
+    private def tryOrIOException[T](block: => T): T = {
+      try {
+        block
+      } catch {
+        case e: IOException =>
+          log.error("Exception encountered", e)
+          throw e
+        case NonFatal(e) =>
+          log.error("Exception encountered", e)
+          throw new IOException(e)
+      }
+    }
+
+    def write(kryo: Kryo, out: Output): Unit = {
+      val dos = new DataOutputStream(out)
+      value.write(dos)
+      dos.flush()
+    }
+
+    def read(kryo: Kryo, in: Input): Unit = {
+      value = new Configuration(false)
+      value.readFields(new DataInputStream(in))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
new file mode 100644
index 0000000..830bf3c
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import java.io.{IOException, OutputStream}
+import java.nio.ByteBuffer
+import java.sql.{Date, Timestamp}
+import java.util.HashMap
+
+import scala.collection.immutable.Map
+
+import org.apache.avro.{Schema, SchemaBuilder}
+import org.apache.avro.generic.GenericData.Record
+import org.apache.avro.generic.GenericRecord
+import org.apache.avro.mapred.AvroKey
+import org.apache.avro.mapreduce.AvroKeyOutputFormat
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.execution.datasources.OutputWriter
+import org.apache.spark.sql.types._
+
+// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
+private[avro] class AvroOutputWriter(
+    path: String,
+    context: TaskAttemptContext,
+    schema: StructType,
+    recordName: String,
+    recordNamespace: String) extends OutputWriter {
+
+  private lazy val converter = createConverterToAvro(schema, recordName, recordNamespace)
+  // copy of the old conversion logic after api change in SPARK-19085
+  private lazy val internalRowConverter =
+    CatalystTypeConverters.createToScalaConverter(schema).asInstanceOf[InternalRow => Row]
+
+  /**
+   * Overrides the couple of methods responsible for generating the output streams / files so
+   * that the data can be correctly partitioned
+   */
+  private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] =
+    new AvroKeyOutputFormat[GenericRecord]() {
+
+      override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+        new Path(path)
+      }
+
+      @throws(classOf[IOException])
+      override def getAvroFileOutputStream(c: TaskAttemptContext): OutputStream = {
+        val path = getDefaultWorkFile(context, ".avro")
+        path.getFileSystem(context.getConfiguration).create(path)
+      }
+
+    }.getRecordWriter(context)
+
+  override def write(internalRow: InternalRow): Unit = {
+    val row = internalRowConverter(internalRow)
+    val key = new AvroKey(converter(row).asInstanceOf[GenericRecord])
+    recordWriter.write(key, NullWritable.get())
+  }
+
+  override def close(): Unit = recordWriter.close(context)
+
+  /**
+   * This function constructs converter function for a given sparkSQL datatype. This is used in
+   * writing Avro records out to disk
+   */
+  private def createConverterToAvro(
+      dataType: DataType,
+      structName: String,
+      recordNamespace: String): (Any) => Any = {
+    dataType match {
+      case BinaryType => (item: Any) => item match {
+        case null => null
+        case bytes: Array[Byte] => ByteBuffer.wrap(bytes)
+      }
+      case ByteType | ShortType | IntegerType | LongType |
+           FloatType | DoubleType | StringType | BooleanType => identity
+      case _: DecimalType => (item: Any) => if (item == null) null else item.toString
+      case TimestampType => (item: Any) =>
+        if (item == null) null else item.asInstanceOf[Timestamp].getTime
+      case DateType => (item: Any) =>
+        if (item == null) null else item.asInstanceOf[Date].getTime
+      case ArrayType(elementType, _) =>
+        val elementConverter = createConverterToAvro(
+          elementType,
+          structName,
+          SchemaConverters.getNewRecordNamespace(elementType, recordNamespace, structName))
+        (item: Any) => {
+          if (item == null) {
+            null
+          } else {
+            val sourceArray = item.asInstanceOf[Seq[Any]]
+            val sourceArraySize = sourceArray.size
+            val targetArray = new Array[Any](sourceArraySize)
+            var idx = 0
+            while (idx < sourceArraySize) {
+              targetArray(idx) = elementConverter(sourceArray(idx))
+              idx += 1
+            }
+            targetArray
+          }
+        }
+      case MapType(StringType, valueType, _) =>
+        val valueConverter = createConverterToAvro(
+          valueType,
+          structName,
+          SchemaConverters.getNewRecordNamespace(valueType, recordNamespace, structName))
+        (item: Any) => {
+          if (item == null) {
+            null
+          } else {
+            val javaMap = new HashMap[String, Any]()
+            item.asInstanceOf[Map[String, Any]].foreach { case (key, value) =>
+              javaMap.put(key, valueConverter(value))
+            }
+            javaMap
+          }
+        }
+      case structType: StructType =>
+        val builder = SchemaBuilder.record(structName).namespace(recordNamespace)
+        val schema: Schema = SchemaConverters.convertStructToAvro(
+          structType, builder, recordNamespace)
+        val fieldConverters = structType.fields.map(field =>
+          createConverterToAvro(
+            field.dataType,
+            field.name,
+            SchemaConverters.getNewRecordNamespace(field.dataType, recordNamespace, field.name)))
+        (item: Any) => {
+          if (item == null) {
+            null
+          } else {
+            val record = new Record(schema)
+            val convertersIterator = fieldConverters.iterator
+            val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator
+            val rowIterator = item.asInstanceOf[Row].toSeq.iterator
+
+            while (convertersIterator.hasNext) {
+              val converter = convertersIterator.next()
+              record.put(fieldNamesIterator.next(), converter(rowIterator.next()))
+            }
+            record
+          }
+        }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
new file mode 100644
index 0000000..5b2ce7d
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.types.StructType
+
+private[avro] class AvroOutputWriterFactory(
+    schema: StructType,
+    recordName: String,
+    recordNamespace: String) extends OutputWriterFactory {
+
+  override def getFileExtension(context: TaskAttemptContext): String = ".avro"
+
+  override def newInstance(
+      path: String,
+      dataSchema: StructType,
+      context: TaskAttemptContext): OutputWriter = {
+    new AvroOutputWriter(path, context, schema, recordName, recordNamespace)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
new file mode 100644
index 0000000..01f8c74
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import java.nio.ByteBuffer
+import java.sql.{Date, Timestamp}
+
+import scala.collection.JavaConverters._
+
+import org.apache.avro.{Schema, SchemaBuilder}
+import org.apache.avro.Schema.Type._
+import org.apache.avro.SchemaBuilder._
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.generic.GenericFixed
+
+import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.types._
+
+/**
+ * This object contains method that are used to convert sparkSQL schemas to avro schemas and vice
+ * versa.
+ */
+object SchemaConverters {
+
+  class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex)
+
+  case class SchemaType(dataType: DataType, nullable: Boolean)
+
+  /**
+   * This function takes an avro schema and returns a sql schema.
+   */
+  def toSqlType(avroSchema: Schema): SchemaType = {
+    avroSchema.getType match {
+      case INT => SchemaType(IntegerType, nullable = false)
+      case STRING => SchemaType(StringType, nullable = false)
+      case BOOLEAN => SchemaType(BooleanType, nullable = false)
+      case BYTES => SchemaType(BinaryType, nullable = false)
+      case DOUBLE => SchemaType(DoubleType, nullable = false)
+      case FLOAT => SchemaType(FloatType, nullable = false)
+      case LONG => SchemaType(LongType, nullable = false)
+      case FIXED => SchemaType(BinaryType, nullable = false)
+      case ENUM => SchemaType(StringType, nullable = false)
+
+      case RECORD =>
+        val fields = avroSchema.getFields.asScala.map { f =>
+          val schemaType = toSqlType(f.schema())
+          StructField(f.name, schemaType.dataType, schemaType.nullable)
+        }
+
+        SchemaType(StructType(fields), nullable = false)
+
+      case ARRAY =>
+        val schemaType = toSqlType(avroSchema.getElementType)
+        SchemaType(
+          ArrayType(schemaType.dataType, containsNull = schemaType.nullable),
+          nullable = false)
+
+      case MAP =>
+        val schemaType = toSqlType(avroSchema.getValueType)
+        SchemaType(
+          MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable),
+          nullable = false)
+
+      case UNION =>
+        if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
+          // In case of a union with null, eliminate it and make a recursive call
+          val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL)
+          if (remainingUnionTypes.size == 1) {
+            toSqlType(remainingUnionTypes.head).copy(nullable = true)
+          } else {
+            toSqlType(Schema.createUnion(remainingUnionTypes.asJava)).copy(nullable = true)
+          }
+        } else avroSchema.getTypes.asScala.map(_.getType) match {
+          case Seq(t1) =>
+            toSqlType(avroSchema.getTypes.get(0))
+          case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
+            SchemaType(LongType, nullable = false)
+          case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
+            SchemaType(DoubleType, nullable = false)
+          case _ =>
+            // Convert complex unions to struct types where field names are member0, member1, etc.
+            // This is consistent with the behavior when converting between Avro and Parquet.
+            val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
+              case (s, i) =>
+                val schemaType = toSqlType(s)
+                // All fields are nullable because only one of them is set at a time
+                StructField(s"member$i", schemaType.dataType, nullable = true)
+            }
+
+            SchemaType(StructType(fields), nullable = false)
+        }
+
+      case other => throw new IncompatibleSchemaException(s"Unsupported type $other")
+    }
+  }
+
+  /**
+   * This function converts sparkSQL StructType into avro schema. This method uses two other
+   * converter methods in order to do the conversion.
+   */
+  def convertStructToAvro[T](
+      structType: StructType,
+      schemaBuilder: RecordBuilder[T],
+      recordNamespace: String): T = {
+    val fieldsAssembler: FieldAssembler[T] = schemaBuilder.fields()
+    structType.fields.foreach { field =>
+      val newField = fieldsAssembler.name(field.name).`type`()
+
+      if (field.nullable) {
+        convertFieldTypeToAvro(field.dataType, newField.nullable(), field.name, recordNamespace)
+          .noDefault
+      } else {
+        convertFieldTypeToAvro(field.dataType, newField, field.name, recordNamespace)
+          .noDefault
+      }
+    }
+    fieldsAssembler.endRecord()
+  }
+
+  /**
+   * Returns a converter function to convert row in avro format to GenericRow of catalyst.
+   *
+   * @param sourceAvroSchema Source schema before conversion inferred from avro file by passed in
+   *                       by user.
+   * @param targetSqlType Target catalyst sql type after the conversion.
+   * @return returns a converter function to convert row in avro format to GenericRow of catalyst.
+   */
+  private[avro] def createConverterToSQL(
+    sourceAvroSchema: Schema,
+    targetSqlType: DataType): AnyRef => AnyRef = {
+
+    def createConverter(avroSchema: Schema,
+        sqlType: DataType, path: List[String]): AnyRef => AnyRef = {
+      val avroType = avroSchema.getType
+      (sqlType, avroType) match {
+        // Avro strings are in Utf8, so we have to call toString on them
+        case (StringType, STRING) | (StringType, ENUM) =>
+          (item: AnyRef) => item.toString
+        // Byte arrays are reused by avro, so we have to make a copy of them.
+        case (IntegerType, INT) | (BooleanType, BOOLEAN) | (DoubleType, DOUBLE) |
+             (FloatType, FLOAT) | (LongType, LONG) =>
+          identity
+        case (TimestampType, LONG) =>
+          (item: AnyRef) => new Timestamp(item.asInstanceOf[Long])
+        case (DateType, LONG) =>
+          (item: AnyRef) => new Date(item.asInstanceOf[Long])
+        case (BinaryType, FIXED) =>
+          (item: AnyRef) => item.asInstanceOf[GenericFixed].bytes().clone()
+        case (BinaryType, BYTES) =>
+          (item: AnyRef) =>
+            val byteBuffer = item.asInstanceOf[ByteBuffer]
+            val bytes = new Array[Byte](byteBuffer.remaining)
+            byteBuffer.get(bytes)
+            bytes
+        case (struct: StructType, RECORD) =>
+          val length = struct.fields.length
+          val converters = new Array[AnyRef => AnyRef](length)
+          val avroFieldIndexes = new Array[Int](length)
+          var i = 0
+          while (i < length) {
+            val sqlField = struct.fields(i)
+            val avroField = avroSchema.getField(sqlField.name)
+            if (avroField != null) {
+              val converter = (item: AnyRef) => {
+                if (item == null) {
+                  item
+                } else {
+                  createConverter(avroField.schema, sqlField.dataType, path :+ sqlField.name)(item)
+                }
+              }
+              converters(i) = converter
+              avroFieldIndexes(i) = avroField.pos()
+            } else if (!sqlField.nullable) {
+              throw new IncompatibleSchemaException(
+                s"Cannot find non-nullable field ${sqlField.name} at path ${path.mkString(".")} " +
+                  "in Avro schema\n" +
+                  s"Source Avro schema: $sourceAvroSchema.\n" +
+                  s"Target Catalyst type: $targetSqlType")
+            }
+            i += 1
+          }
+
+          (item: AnyRef) =>
+            val record = item.asInstanceOf[GenericRecord]
+            val result = new Array[Any](length)
+            var i = 0
+            while (i < converters.length) {
+              if (converters(i) != null) {
+                val converter = converters(i)
+                result(i) = converter(record.get(avroFieldIndexes(i)))
+              }
+              i += 1
+            }
+            new GenericRow(result)
+        case (arrayType: ArrayType, ARRAY) =>
+          val elementConverter = createConverter(avroSchema.getElementType, arrayType.elementType,
+            path)
+          val allowsNull = arrayType.containsNull
+          (item: AnyRef) =>
+            item.asInstanceOf[java.lang.Iterable[AnyRef]].asScala.map { element =>
+              if (element == null && !allowsNull) {
+                throw new RuntimeException(s"Array value at path ${path.mkString(".")} is not " +
+                  "allowed to be null")
+              } else {
+                elementConverter(element)
+              }
+            }
+        case (mapType: MapType, MAP) if mapType.keyType == StringType =>
+          val valueConverter = createConverter(avroSchema.getValueType, mapType.valueType, path)
+          val allowsNull = mapType.valueContainsNull
+          (item: AnyRef) =>
+            item.asInstanceOf[java.util.Map[AnyRef, AnyRef]].asScala.map { case (k, v) =>
+              if (v == null && !allowsNull) {
+                throw new RuntimeException(s"Map value at path ${path.mkString(".")} is not " +
+                  "allowed to be null")
+              } else {
+                (k.toString, valueConverter(v))
+              }
+            }.toMap
+        case (sqlType, UNION) =>
+          if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
+            val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL)
+            if (remainingUnionTypes.size == 1) {
+              createConverter(remainingUnionTypes.head, sqlType, path)
+            } else {
+              createConverter(Schema.createUnion(remainingUnionTypes.asJava), sqlType, path)
+            }
+          } else avroSchema.getTypes.asScala.map(_.getType) match {
+            case Seq(t1) => createConverter(avroSchema.getTypes.get(0), sqlType, path)
+            case Seq(a, b) if Set(a, b) == Set(INT, LONG) && sqlType == LongType =>
+              (item: AnyRef) =>
+                item match {
+                  case l: java.lang.Long => l
+                  case i: java.lang.Integer => new java.lang.Long(i.longValue())
+                }
+            case Seq(a, b) if Set(a, b) == Set(FLOAT, DOUBLE) && sqlType == DoubleType =>
+              (item: AnyRef) =>
+                item match {
+                  case d: java.lang.Double => d
+                  case f: java.lang.Float => new java.lang.Double(f.doubleValue())
+                }
+            case other =>
+              sqlType match {
+                case t: StructType if t.fields.length == avroSchema.getTypes.size =>
+                  val fieldConverters = t.fields.zip(avroSchema.getTypes.asScala).map {
+                    case (field, schema) =>
+                      createConverter(schema, field.dataType, path :+ field.name)
+                  }
+                  (item: AnyRef) =>
+                    val i = GenericData.get().resolveUnion(avroSchema, item)
+                    val converted = new Array[Any](fieldConverters.length)
+                    converted(i) = fieldConverters(i)(item)
+                    new GenericRow(converted)
+                case _ => throw new IncompatibleSchemaException(
+                  s"Cannot convert Avro schema to catalyst type because schema at path " +
+                    s"${path.mkString(".")} is not compatible " +
+                    s"(avroType = $other, sqlType = $sqlType). \n" +
+                    s"Source Avro schema: $sourceAvroSchema.\n" +
+                    s"Target Catalyst type: $targetSqlType")
+              }
+          }
+        case (left, right) =>
+          throw new IncompatibleSchemaException(
+            s"Cannot convert Avro schema to catalyst type because schema at path " +
+              s"${path.mkString(".")} is not compatible (avroType = $right, sqlType = $left). \n" +
+              s"Source Avro schema: $sourceAvroSchema.\n" +
+              s"Target Catalyst type: $targetSqlType")
+      }
+    }
+    createConverter(sourceAvroSchema, targetSqlType, List.empty[String])
+  }
+
+  /**
+   * This function is used to convert some sparkSQL type to avro type. Note that this function won't
+   * be used to construct fields of avro record (convertFieldTypeToAvro is used for that).
+   */
+  private def convertTypeToAvro[T](
+      dataType: DataType,
+      schemaBuilder: BaseTypeBuilder[T],
+      structName: String,
+      recordNamespace: String): T = {
+    dataType match {
+      case ByteType => schemaBuilder.intType()
+      case ShortType => schemaBuilder.intType()
+      case IntegerType => schemaBuilder.intType()
+      case LongType => schemaBuilder.longType()
+      case FloatType => schemaBuilder.floatType()
+      case DoubleType => schemaBuilder.doubleType()
+      case _: DecimalType => schemaBuilder.stringType()
+      case StringType => schemaBuilder.stringType()
+      case BinaryType => schemaBuilder.bytesType()
+      case BooleanType => schemaBuilder.booleanType()
+      case TimestampType => schemaBuilder.longType()
+      case DateType => schemaBuilder.longType()
+
+      case ArrayType(elementType, _) =>
+        val builder = getSchemaBuilder(dataType.asInstanceOf[ArrayType].containsNull)
+        val elementSchema = convertTypeToAvro(elementType, builder, structName, recordNamespace)
+        schemaBuilder.array().items(elementSchema)
+
+      case MapType(StringType, valueType, _) =>
+        val builder = getSchemaBuilder(dataType.asInstanceOf[MapType].valueContainsNull)
+        val valueSchema = convertTypeToAvro(valueType, builder, structName, recordNamespace)
+        schemaBuilder.map().values(valueSchema)
+
+      case structType: StructType =>
+        convertStructToAvro(
+          structType,
+          schemaBuilder.record(structName).namespace(recordNamespace),
+          recordNamespace)
+
+      case other => throw new IncompatibleSchemaException(s"Unexpected type $dataType.")
+    }
+  }
+
+  /**
+   * This function is used to construct fields of the avro record, where schema of the field is
+   * specified by avro representation of dataType. Since builders for record fields are different
+   * from those for everything else, we have to use a separate method.
+   */
+  private def convertFieldTypeToAvro[T](
+      dataType: DataType,
+      newFieldBuilder: BaseFieldTypeBuilder[T],
+      structName: String,
+      recordNamespace: String): FieldDefault[T, _] = {
+    dataType match {
+      case ByteType => newFieldBuilder.intType()
+      case ShortType => newFieldBuilder.intType()
+      case IntegerType => newFieldBuilder.intType()
+      case LongType => newFieldBuilder.longType()
+      case FloatType => newFieldBuilder.floatType()
+      case DoubleType => newFieldBuilder.doubleType()
+      case _: DecimalType => newFieldBuilder.stringType()
+      case StringType => newFieldBuilder.stringType()
+      case BinaryType => newFieldBuilder.bytesType()
+      case BooleanType => newFieldBuilder.booleanType()
+      case TimestampType => newFieldBuilder.longType()
+      case DateType => newFieldBuilder.longType()
+
+      case ArrayType(elementType, _) =>
+        val builder = getSchemaBuilder(dataType.asInstanceOf[ArrayType].containsNull)
+        val elementSchema = convertTypeToAvro(
+          elementType,
+          builder,
+          structName,
+          getNewRecordNamespace(elementType, recordNamespace, structName))
+        newFieldBuilder.array().items(elementSchema)
+
+      case MapType(StringType, valueType, _) =>
+        val builder = getSchemaBuilder(dataType.asInstanceOf[MapType].valueContainsNull)
+        val valueSchema = convertTypeToAvro(
+          valueType,
+          builder,
+          structName,
+          getNewRecordNamespace(valueType, recordNamespace, structName))
+        newFieldBuilder.map().values(valueSchema)
+
+      case structType: StructType =>
+        convertStructToAvro(
+          structType,
+          newFieldBuilder.record(structName).namespace(s"$recordNamespace.$structName"),
+          s"$recordNamespace.$structName")
+
+      case other => throw new IncompatibleSchemaException(s"Unexpected type $dataType.")
+    }
+  }
+
+  /**
+   * Returns a new namespace depending on the data type of the element.
+   * If the data type is a StructType it returns the current namespace concatenated
+   * with the element name, otherwise it returns the current namespace as it is.
+   */
+  private[avro] def getNewRecordNamespace(
+      elementDataType: DataType,
+      currentRecordNamespace: String,
+      elementName: String): String = {
+
+    elementDataType match {
+      case StructType(_) => s"$currentRecordNamespace.$elementName"
+      case _ => currentRecordNamespace
+    }
+  }
+
+  private def getSchemaBuilder(isNullable: Boolean): BaseTypeBuilder[Schema] = {
+    if (isNullable) {
+      SchemaBuilder.builder().nullable()
+    } else {
+      SchemaBuilder.builder()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
new file mode 100755
index 0000000..b3c8a66
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+package object avro {
+  /**
+   * Adds a method, `avro`, to DataFrameWriter that allows you to write avro files using
+   * the DataFileWriter
+   */
+  implicit class AvroDataFrameWriter[T](writer: DataFrameWriter[T]) {
+    def avro: String => Unit = writer.format("avro").save
+  }
+
+  /**
+   * Adds a method, `avro`, to DataFrameReader that allows you to read avro files using
+   * the DataFileReader
+   */
+  implicit class AvroDataFrameReader(reader: DataFrameReader) {
+    def avro: String => DataFrame = reader.format("avro").load
+
+    @scala.annotation.varargs
+    def avro(sources: String*): DataFrame = reader.format("avro").load(sources: _*)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/episodes.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/episodes.avro b/external/avro/src/test/resources/episodes.avro
new file mode 100644
index 0000000..58a028c
Binary files /dev/null and b/external/avro/src/test/resources/episodes.avro differ

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/log4j.properties b/external/avro/src/test/resources/log4j.properties
new file mode 100644
index 0000000..c18a724
--- /dev/null
+++ b/external/avro/src/test/resources/log4j.properties
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file core/target/unit-tests.log
+log4j.rootLogger=DEBUG, CA, FA
+
+#Console Appender
+log4j.appender.CA=org.apache.log4j.ConsoleAppender
+log4j.appender.CA.layout=org.apache.log4j.PatternLayout
+log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
+log4j.appender.CA.Threshold = WARN
+
+
+#File Appender
+log4j.appender.FA=org.apache.log4j.FileAppender
+log4j.appender.FA.append=false
+log4j.appender.FA.file=target/unit-tests.log
+log4j.appender.FA.layout=org.apache.log4j.PatternLayout
+log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Set the logger level of File Appender to WARN
+log4j.appender.FA.Threshold = INFO
+
+# Some packages are noisy for no good reason.
+log4j.additivity.parquet.hadoop.ParquetRecordReader=false
+log4j.logger.parquet.hadoop.ParquetRecordReader=OFF
+
+log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false
+log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF
+
+log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false
+log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF
+
+log4j.additivity.hive.ql.metadata.Hive=false
+log4j.logger.hive.ql.metadata.Hive=OFF
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00000.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00000.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00000.avro
new file mode 100755
index 0000000..fece892
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00000.avro differ

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00001.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00001.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00001.avro
new file mode 100755
index 0000000..1ca623a
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00001.avro differ

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00002.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00002.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00002.avro
new file mode 100755
index 0000000..a12e945
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00002.avro differ

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00003.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00003.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00003.avro
new file mode 100755
index 0000000..60c0956
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00003.avro differ

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00004.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00004.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00004.avro
new file mode 100755
index 0000000..af56dfc
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00004.avro differ

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00005.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00005.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00005.avro
new file mode 100755
index 0000000..87d7844
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00005.avro differ

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00006.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00006.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00006.avro
new file mode 100755
index 0000000..c326fc4
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00006.avro differ

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00007.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00007.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00007.avro
new file mode 100755
index 0000000..279f36c
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00007.avro differ

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00008.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00008.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00008.avro
new file mode 100755
index 0000000..8d70f5d
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00008.avro differ

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00009.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00009.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00009.avro
new file mode 100755
index 0000000..6839d72
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00009.avro differ

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00010.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00010.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00010.avro
new file mode 100755
index 0000000..aedc7f7
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00010.avro differ

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test.avro b/external/avro/src/test/resources/test.avro
new file mode 100644
index 0000000..6425e21
Binary files /dev/null and b/external/avro/src/test/resources/test.avro differ

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test.avsc
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test.avsc b/external/avro/src/test/resources/test.avsc
new file mode 100644
index 0000000..d7119a0
--- /dev/null
+++ b/external/avro/src/test/resources/test.avsc
@@ -0,0 +1,53 @@
+{
+  "type" : "record",
+  "name" : "test_schema",
+  "fields" : [{
+    "name" : "string",
+    "type" : "string",
+    "doc"  : "Meaningless string of characters"
+  }, {
+    "name" : "simple_map",
+    "type" : {"type": "map", "values": "int"}
+  }, {
+    "name" : "complex_map",
+    "type" : {"type": "map", "values": {"type": "map", "values": "string"}}
+  }, {
+    "name" : "union_string_null",
+    "type" : ["null", "string"]
+  }, {
+    "name" : "union_int_long_null",
+    "type" : ["int", "long", "null"]
+  }, {
+    "name" : "union_float_double",
+    "type" : ["float", "double"]
+  }, {
+    "name": "fixed3",
+    "type": {"type": "fixed", "size": 3, "name": "fixed3"}
+  }, {
+    "name": "fixed2",
+    "type": {"type": "fixed", "size": 2, "name": "fixed2"}
+  }, {
+    "name": "enum",
+    "type": { "type": "enum",
+              "name": "Suit",
+              "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
+            }
+  }, {
+    "name": "record",
+    "type": {
+      "type": "record", 
+      "name": "record",
+      "aliases": ["RecordAlias"],
+      "fields" : [{
+        "name": "value_field",
+        "type": "string"
+      }]
+    }
+  }, {
+    "name": "array_of_boolean",
+    "type": {"type": "array", "items": "boolean"}
+  }, {
+    "name": "bytes",
+    "type": "bytes"
+  }]
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test.json
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test.json b/external/avro/src/test/resources/test.json
new file mode 100644
index 0000000..780189a
--- /dev/null
+++ b/external/avro/src/test/resources/test.json
@@ -0,0 +1,42 @@
+{
+	"string": "OMG SPARK IS AWESOME",
+	"simple_map": {"abc": 1, "bcd": 7},
+	"complex_map": {"key": {"a": "b", "c": "d"}},
+	"union_string_null": {"string": "abc"},
+	"union_int_long_null": {"int": 1},
+	"union_float_double": {"float": 3.1415926535},
+	"fixed3":"\u0002\u0003\u0004",
+	"fixed2":"\u0011\u0012",
+	"enum": "SPADES",
+	"record": {"value_field": "Two things are infinite: the universe and human stupidity; and I'm not sure about universe."},
+	"array_of_boolean": [true, false, false],
+	"bytes": "\u0041\u0042\u0043"
+}
+{
+	"string": "Terran is IMBA!",
+	"simple_map": {"mmm": 0, "qqq": 66},
+	"complex_map": {"key": {"1": "2", "3": "4"}},
+	"union_string_null": {"string": "123"},
+	"union_int_long_null": {"long": 66},
+	"union_float_double": {"double": 6.6666666666666},
+	"fixed3":"\u0007\u0007\u0007",
+	"fixed2":"\u0001\u0002",
+	"enum": "CLUBS",
+	"record": {"value_field": "Life did not intend to make us perfect. Whoever is perfect belongs in a museum."},
+	"array_of_boolean": [],
+	"bytes": ""
+}
+{
+	"string": "The cake is a LIE!",
+	"simple_map": {},
+	"complex_map": {"key": {}},
+	"union_string_null": {"null": null},
+	"union_int_long_null": {"null": null},
+	"union_float_double": {"double": 0},
+	"fixed3":"\u0011\u0022\u0009",
+	"fixed2":"\u0010\u0090",
+	"enum": "DIAMONDS",
+	"record": {"value_field": "TEST_STR123"},
+	"array_of_boolean": [false],
+	"bytes": "\u0053"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org