You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/07/12 20:55:31 UTC
[2/2] spark git commit: [SPARK-24768][SQL] Have a built-in AVRO data
source implementation
[SPARK-24768][SQL] Have a built-in AVRO data source implementation
## What changes were proposed in this pull request?
Apache Avro (https://avro.apache.org) is a popular data serialization format. It is widely used in the Spark and Hadoop ecosystem, especially for Kafka-based data pipelines. Using the external package https://github.com/databricks/spark-avro, Spark SQL can read and write the avro data. Making spark-Avro built-in can provide a better experience for first-time users of Spark SQL and structured streaming. We expect the built-in Avro data source can further improve the adoption of structured streaming.
The proposal is to inline code from spark-avro package (https://github.com/databricks/spark-avro). The target release is Spark 2.4.
[Built-in AVRO Data Source In Spark 2.4.pdf](https://github.com/apache/spark/files/2181511/Built-in.AVRO.Data.Source.In.Spark.2.4.pdf)
## How was this patch tested?
Unit test
Author: Gengliang Wang <ge...@databricks.com>
Closes #21742 from gengliangwang/export_avro.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/395860a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/395860a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/395860a9
Branch: refs/heads/master
Commit: 395860a986987886df6d60fd9b26afd818b2cb39
Parents: 1055c94
Author: Gengliang Wang <ge...@databricks.com>
Authored: Thu Jul 12 13:55:25 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Thu Jul 12 13:55:25 2018 -0700
----------------------------------------------------------------------
dev/run-tests.py | 2 +-
dev/sparktestsupport/modules.py | 10 +
external/avro/pom.xml | 73 ++
....apache.spark.sql.sources.DataSourceRegister | 1 +
.../apache/spark/sql/avro/AvroFileFormat.scala | 289 +++++++
.../spark/sql/avro/AvroOutputWriter.scala | 164 ++++
.../sql/avro/AvroOutputWriterFactory.scala | 38 +
.../spark/sql/avro/SchemaConverters.scala | 406 ++++++++++
.../org/apache/spark/sql/avro/package.scala | 39 +
external/avro/src/test/resources/episodes.avro | Bin 0 -> 597 bytes
.../avro/src/test/resources/log4j.properties | 49 ++
.../test-random-partitioned/part-r-00000.avro | Bin 0 -> 1768 bytes
.../test-random-partitioned/part-r-00001.avro | Bin 0 -> 2313 bytes
.../test-random-partitioned/part-r-00002.avro | Bin 0 -> 1621 bytes
.../test-random-partitioned/part-r-00003.avro | Bin 0 -> 2117 bytes
.../test-random-partitioned/part-r-00004.avro | Bin 0 -> 3282 bytes
.../test-random-partitioned/part-r-00005.avro | Bin 0 -> 1550 bytes
.../test-random-partitioned/part-r-00006.avro | Bin 0 -> 1729 bytes
.../test-random-partitioned/part-r-00007.avro | Bin 0 -> 1897 bytes
.../test-random-partitioned/part-r-00008.avro | Bin 0 -> 3420 bytes
.../test-random-partitioned/part-r-00009.avro | Bin 0 -> 1796 bytes
.../test-random-partitioned/part-r-00010.avro | Bin 0 -> 3872 bytes
external/avro/src/test/resources/test.avro | Bin 0 -> 1365 bytes
external/avro/src/test/resources/test.avsc | 53 ++
external/avro/src/test/resources/test.json | 42 +
.../org/apache/spark/sql/avro/AvroSuite.scala | 812 +++++++++++++++++++
.../avro/SerializableConfigurationSuite.scala | 50 ++
.../org/apache/spark/sql/avro/TestUtils.scala | 156 ++++
pom.xml | 1 +
project/SparkBuild.scala | 12 +-
30 files changed, 2191 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/dev/run-tests.py
----------------------------------------------------------------------
diff --git a/dev/run-tests.py b/dev/run-tests.py
index cd45908..d9d3789 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -110,7 +110,7 @@ def determine_modules_to_test(changed_modules):
['graphx', 'examples']
>>> x = [x.name for x in determine_modules_to_test([modules.sql])]
>>> x # doctest: +NORMALIZE_WHITESPACE
- ['sql', 'hive', 'mllib', 'sql-kafka-0-10', 'examples', 'hive-thriftserver',
+ ['sql', 'avro', 'hive', 'mllib', 'sql-kafka-0-10', 'examples', 'hive-thriftserver',
'pyspark-sql', 'repl', 'sparkr', 'pyspark-mllib', 'pyspark-ml']
"""
modules_to_test = set()
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index dfea762..2aa3555 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -170,6 +170,16 @@ hive_thriftserver = Module(
]
)
+avro = Module(
+ name="avro",
+ dependencies=[sql],
+ source_file_regexes=[
+ "external/avro",
+ ],
+ sbt_test_goals=[
+ "avro/test",
+ ]
+)
sql_kafka = Module(
name="sql-kafka-0-10",
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/pom.xml
----------------------------------------------------------------------
diff --git a/external/avro/pom.xml b/external/avro/pom.xml
new file mode 100644
index 0000000..42e865b
--- /dev/null
+++ b/external/avro/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.11</artifactId>
+ <version>2.4.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>spark-sql-avro_2.11</artifactId>
+ <properties>
+ <sbt.project.name>avro</sbt.project.name>
+ </properties>
+ <packaging>jar</packaging>
+ <name>Spark Avro</name>
+ <url>http://spark.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-tags_${scala.binary.version}</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/external/avro/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/external/avro/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..95835f0
--- /dev/null
+++ b/external/avro/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1 @@
+org.apache.spark.sql.avro.AvroFileFormat
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
new file mode 100755
index 0000000..46e5a18
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import java.io._
+import java.net.URI
+import java.util.zip.Deflater
+
+import scala.util.control.NonFatal
+
+import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
+import com.esotericsoftware.kryo.io.{Input, Output}
+import org.apache.avro.{Schema, SchemaBuilder}
+import org.apache.avro.file.{DataFileConstants, DataFileReader}
+import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
+import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
+import org.apache.avro.mapreduce.AvroJob
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.Job
+import org.slf4j.LoggerFactory
+
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.types.StructType
+
+private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
+ private val log = LoggerFactory.getLogger(getClass)
+
+ override def equals(other: Any): Boolean = other match {
+ case _: AvroFileFormat => true
+ case _ => false
+ }
+
+ // Dummy hashCode() to appease ScalaStyle.
+ override def hashCode(): Int = super.hashCode()
+
+ override def inferSchema(
+ spark: SparkSession,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = {
+ val conf = spark.sparkContext.hadoopConfiguration
+
+ // Schema evolution is not supported yet. Here we only pick a single random sample file to
+ // figure out the schema of the whole dataset.
+ val sampleFile =
+ if (conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true)) {
+ files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
+ throw new FileNotFoundException(
+ "No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" " +
+ " is set to true. Do all input files have \".avro\" extension?"
+ )
+ }
+ } else {
+ files.headOption.getOrElse {
+ throw new FileNotFoundException("No Avro files found.")
+ }
+ }
+
+ // User can specify an optional avro json schema.
+ val avroSchema = options.get(AvroFileFormat.AvroSchema)
+ .map(new Schema.Parser().parse)
+ .getOrElse {
+ val in = new FsInput(sampleFile.getPath, conf)
+ try {
+ val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]())
+ try {
+ reader.getSchema
+ } finally {
+ reader.close()
+ }
+ } finally {
+ in.close()
+ }
+ }
+
+ SchemaConverters.toSqlType(avroSchema).dataType match {
+ case t: StructType => Some(t)
+ case _ => throw new RuntimeException(
+ s"""Avro schema cannot be converted to a Spark SQL StructType:
+ |
+ |${avroSchema.toString(true)}
+ |""".stripMargin)
+ }
+ }
+
+ override def shortName(): String = "avro"
+
+ override def isSplitable(
+ sparkSession: SparkSession,
+ options: Map[String, String],
+ path: Path): Boolean = true
+
+ override def prepareWrite(
+ spark: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+ val recordName = options.getOrElse("recordName", "topLevelRecord")
+ val recordNamespace = options.getOrElse("recordNamespace", "")
+ val build = SchemaBuilder.record(recordName).namespace(recordNamespace)
+ val outputAvroSchema = SchemaConverters.convertStructToAvro(dataSchema, build, recordNamespace)
+
+ AvroJob.setOutputKeySchema(job, outputAvroSchema)
+ val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec"
+ val AVRO_DEFLATE_LEVEL = "spark.sql.avro.deflate.level"
+ val COMPRESS_KEY = "mapred.output.compress"
+
+ spark.conf.get(AVRO_COMPRESSION_CODEC, "snappy") match {
+ case "uncompressed" =>
+ log.info("writing uncompressed Avro records")
+ job.getConfiguration.setBoolean(COMPRESS_KEY, false)
+
+ case "snappy" =>
+ log.info("compressing Avro output using Snappy")
+ job.getConfiguration.setBoolean(COMPRESS_KEY, true)
+ job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC)
+
+ case "deflate" =>
+ val deflateLevel = spark.conf.get(
+ AVRO_DEFLATE_LEVEL, Deflater.DEFAULT_COMPRESSION.toString).toInt
+ log.info(s"compressing Avro output using deflate (level=$deflateLevel)")
+ job.getConfiguration.setBoolean(COMPRESS_KEY, true)
+ job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC)
+ job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
+
+ case unknown: String =>
+ log.error(s"unsupported compression codec $unknown")
+ }
+
+ new AvroOutputWriterFactory(dataSchema, recordName, recordNamespace)
+ }
+
+ override def buildReader(
+ spark: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
+
+ val broadcastedConf =
+ spark.sparkContext.broadcast(new AvroFileFormat.SerializableConfiguration(hadoopConf))
+
+ (file: PartitionedFile) => {
+ val log = LoggerFactory.getLogger(classOf[AvroFileFormat])
+ val conf = broadcastedConf.value.value
+ val userProvidedSchema = options.get(AvroFileFormat.AvroSchema).map(new Schema.Parser().parse)
+
+ // TODO Removes this check once `FileFormat` gets a general file filtering interface method.
+ // Doing input file filtering is improper because we may generate empty tasks that process no
+ // input files but stress the scheduler. We should probably add a more general input file
+ // filtering mechanism for `FileFormat` data sources. See SPARK-16317.
+ if (
+ conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true) &&
+ !file.filePath.endsWith(".avro")
+ ) {
+ Iterator.empty
+ } else {
+ val reader = {
+ val in = new FsInput(new Path(new URI(file.filePath)), conf)
+ try {
+ val datumReader = userProvidedSchema match {
+ case Some(userSchema) => new GenericDatumReader[GenericRecord](userSchema)
+ case _ => new GenericDatumReader[GenericRecord]()
+ }
+ DataFileReader.openReader(in, datumReader)
+ } catch {
+ case NonFatal(e) =>
+ log.error("Exception while opening DataFileReader", e)
+ in.close()
+ throw e
+ }
+ }
+
+ // Ensure that the reader is closed even if the task fails or doesn't consume the entire
+ // iterator of records.
+ Option(TaskContext.get()).foreach { taskContext =>
+ taskContext.addTaskCompletionListener { _ =>
+ reader.close()
+ }
+ }
+
+ reader.sync(file.start)
+ val stop = file.start + file.length
+
+ val rowConverter = SchemaConverters.createConverterToSQL(
+ userProvidedSchema.getOrElse(reader.getSchema), requiredSchema)
+
+ new Iterator[InternalRow] {
+ // Used to convert `Row`s containing data columns into `InternalRow`s.
+ private val encoderForDataColumns = RowEncoder(requiredSchema)
+
+ private[this] var completed = false
+
+ override def hasNext: Boolean = {
+ if (completed) {
+ false
+ } else {
+ val r = reader.hasNext && !reader.pastSync(stop)
+ if (!r) {
+ reader.close()
+ completed = true
+ }
+ r
+ }
+ }
+
+ override def next(): InternalRow = {
+ if (reader.pastSync(stop)) {
+ throw new NoSuchElementException("next on empty iterator")
+ }
+ val record = reader.next()
+ val safeDataRow = rowConverter(record).asInstanceOf[GenericRow]
+
+ // The safeDataRow is reused, we must do a copy
+ encoderForDataColumns.toRow(safeDataRow)
+ }
+ }
+ }
+ }
+ }
+}
+
+private[avro] object AvroFileFormat {
+ val IgnoreFilesWithoutExtensionProperty = "avro.mapred.ignore.inputs.without.extension"
+
+ val AvroSchema = "avroSchema"
+
+ class SerializableConfiguration(@transient var value: Configuration)
+ extends Serializable with KryoSerializable {
+ @transient private[avro] lazy val log = LoggerFactory.getLogger(getClass)
+
+ private def writeObject(out: ObjectOutputStream): Unit = tryOrIOException {
+ out.defaultWriteObject()
+ value.write(out)
+ }
+
+ private def readObject(in: ObjectInputStream): Unit = tryOrIOException {
+ value = new Configuration(false)
+ value.readFields(in)
+ }
+
+ private def tryOrIOException[T](block: => T): T = {
+ try {
+ block
+ } catch {
+ case e: IOException =>
+ log.error("Exception encountered", e)
+ throw e
+ case NonFatal(e) =>
+ log.error("Exception encountered", e)
+ throw new IOException(e)
+ }
+ }
+
+ def write(kryo: Kryo, out: Output): Unit = {
+ val dos = new DataOutputStream(out)
+ value.write(dos)
+ dos.flush()
+ }
+
+ def read(kryo: Kryo, in: Input): Unit = {
+ value = new Configuration(false)
+ value.readFields(new DataInputStream(in))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
new file mode 100644
index 0000000..830bf3c
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import java.io.{IOException, OutputStream}
+import java.nio.ByteBuffer
+import java.sql.{Date, Timestamp}
+import java.util.HashMap
+
+import scala.collection.immutable.Map
+
+import org.apache.avro.{Schema, SchemaBuilder}
+import org.apache.avro.generic.GenericData.Record
+import org.apache.avro.generic.GenericRecord
+import org.apache.avro.mapred.AvroKey
+import org.apache.avro.mapreduce.AvroKeyOutputFormat
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.execution.datasources.OutputWriter
+import org.apache.spark.sql.types._
+
+// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
+private[avro] class AvroOutputWriter(
+ path: String,
+ context: TaskAttemptContext,
+ schema: StructType,
+ recordName: String,
+ recordNamespace: String) extends OutputWriter {
+
+ private lazy val converter = createConverterToAvro(schema, recordName, recordNamespace)
+ // copy of the old conversion logic after api change in SPARK-19085
+ private lazy val internalRowConverter =
+ CatalystTypeConverters.createToScalaConverter(schema).asInstanceOf[InternalRow => Row]
+
+ /**
+ * Overrides the couple of methods responsible for generating the output streams / files so
+ * that the data can be correctly partitioned
+ */
+ private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] =
+ new AvroKeyOutputFormat[GenericRecord]() {
+
+ override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+ new Path(path)
+ }
+
+ @throws(classOf[IOException])
+ override def getAvroFileOutputStream(c: TaskAttemptContext): OutputStream = {
+ val path = getDefaultWorkFile(context, ".avro")
+ path.getFileSystem(context.getConfiguration).create(path)
+ }
+
+ }.getRecordWriter(context)
+
+ override def write(internalRow: InternalRow): Unit = {
+ val row = internalRowConverter(internalRow)
+ val key = new AvroKey(converter(row).asInstanceOf[GenericRecord])
+ recordWriter.write(key, NullWritable.get())
+ }
+
+ override def close(): Unit = recordWriter.close(context)
+
+ /**
+ * This function constructs converter function for a given sparkSQL datatype. This is used in
+ * writing Avro records out to disk
+ */
+ private def createConverterToAvro(
+ dataType: DataType,
+ structName: String,
+ recordNamespace: String): (Any) => Any = {
+ dataType match {
+ case BinaryType => (item: Any) => item match {
+ case null => null
+ case bytes: Array[Byte] => ByteBuffer.wrap(bytes)
+ }
+ case ByteType | ShortType | IntegerType | LongType |
+ FloatType | DoubleType | StringType | BooleanType => identity
+ case _: DecimalType => (item: Any) => if (item == null) null else item.toString
+ case TimestampType => (item: Any) =>
+ if (item == null) null else item.asInstanceOf[Timestamp].getTime
+ case DateType => (item: Any) =>
+ if (item == null) null else item.asInstanceOf[Date].getTime
+ case ArrayType(elementType, _) =>
+ val elementConverter = createConverterToAvro(
+ elementType,
+ structName,
+ SchemaConverters.getNewRecordNamespace(elementType, recordNamespace, structName))
+ (item: Any) => {
+ if (item == null) {
+ null
+ } else {
+ val sourceArray = item.asInstanceOf[Seq[Any]]
+ val sourceArraySize = sourceArray.size
+ val targetArray = new Array[Any](sourceArraySize)
+ var idx = 0
+ while (idx < sourceArraySize) {
+ targetArray(idx) = elementConverter(sourceArray(idx))
+ idx += 1
+ }
+ targetArray
+ }
+ }
+ case MapType(StringType, valueType, _) =>
+ val valueConverter = createConverterToAvro(
+ valueType,
+ structName,
+ SchemaConverters.getNewRecordNamespace(valueType, recordNamespace, structName))
+ (item: Any) => {
+ if (item == null) {
+ null
+ } else {
+ val javaMap = new HashMap[String, Any]()
+ item.asInstanceOf[Map[String, Any]].foreach { case (key, value) =>
+ javaMap.put(key, valueConverter(value))
+ }
+ javaMap
+ }
+ }
+ case structType: StructType =>
+ val builder = SchemaBuilder.record(structName).namespace(recordNamespace)
+ val schema: Schema = SchemaConverters.convertStructToAvro(
+ structType, builder, recordNamespace)
+ val fieldConverters = structType.fields.map(field =>
+ createConverterToAvro(
+ field.dataType,
+ field.name,
+ SchemaConverters.getNewRecordNamespace(field.dataType, recordNamespace, field.name)))
+ (item: Any) => {
+ if (item == null) {
+ null
+ } else {
+ val record = new Record(schema)
+ val convertersIterator = fieldConverters.iterator
+ val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator
+ val rowIterator = item.asInstanceOf[Row].toSeq.iterator
+
+ while (convertersIterator.hasNext) {
+ val converter = convertersIterator.next()
+ record.put(fieldNamesIterator.next(), converter(rowIterator.next()))
+ }
+ record
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
new file mode 100644
index 0000000..5b2ce7d
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.types.StructType
+
+private[avro] class AvroOutputWriterFactory(
+ schema: StructType,
+ recordName: String,
+ recordNamespace: String) extends OutputWriterFactory {
+
+ override def getFileExtension(context: TaskAttemptContext): String = ".avro"
+
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ new AvroOutputWriter(path, context, schema, recordName, recordNamespace)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
new file mode 100644
index 0000000..01f8c74
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import java.nio.ByteBuffer
+import java.sql.{Date, Timestamp}
+
+import scala.collection.JavaConverters._
+
+import org.apache.avro.{Schema, SchemaBuilder}
+import org.apache.avro.Schema.Type._
+import org.apache.avro.SchemaBuilder._
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.generic.GenericFixed
+
+import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.types._
+
+/**
+ * This object contains method that are used to convert sparkSQL schemas to avro schemas and vice
+ * versa.
+ */
+object SchemaConverters {
+
+ class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex)
+
+ case class SchemaType(dataType: DataType, nullable: Boolean)
+
+ /**
+ * This function takes an avro schema and returns a sql schema.
+ */
+ def toSqlType(avroSchema: Schema): SchemaType = {
+ avroSchema.getType match {
+ case INT => SchemaType(IntegerType, nullable = false)
+ case STRING => SchemaType(StringType, nullable = false)
+ case BOOLEAN => SchemaType(BooleanType, nullable = false)
+ case BYTES => SchemaType(BinaryType, nullable = false)
+ case DOUBLE => SchemaType(DoubleType, nullable = false)
+ case FLOAT => SchemaType(FloatType, nullable = false)
+ case LONG => SchemaType(LongType, nullable = false)
+ case FIXED => SchemaType(BinaryType, nullable = false)
+ case ENUM => SchemaType(StringType, nullable = false)
+
+ case RECORD =>
+ val fields = avroSchema.getFields.asScala.map { f =>
+ val schemaType = toSqlType(f.schema())
+ StructField(f.name, schemaType.dataType, schemaType.nullable)
+ }
+
+ SchemaType(StructType(fields), nullable = false)
+
+ case ARRAY =>
+ val schemaType = toSqlType(avroSchema.getElementType)
+ SchemaType(
+ ArrayType(schemaType.dataType, containsNull = schemaType.nullable),
+ nullable = false)
+
+ case MAP =>
+ val schemaType = toSqlType(avroSchema.getValueType)
+ SchemaType(
+ MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable),
+ nullable = false)
+
+ case UNION =>
+ if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
+ // In case of a union with null, eliminate it and make a recursive call
+ val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL)
+ if (remainingUnionTypes.size == 1) {
+ toSqlType(remainingUnionTypes.head).copy(nullable = true)
+ } else {
+ toSqlType(Schema.createUnion(remainingUnionTypes.asJava)).copy(nullable = true)
+ }
+ } else avroSchema.getTypes.asScala.map(_.getType) match {
+ case Seq(t1) =>
+ toSqlType(avroSchema.getTypes.get(0))
+ case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
+ SchemaType(LongType, nullable = false)
+ case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
+ SchemaType(DoubleType, nullable = false)
+ case _ =>
+ // Convert complex unions to struct types where field names are member0, member1, etc.
+ // This is consistent with the behavior when converting between Avro and Parquet.
+ val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
+ case (s, i) =>
+ val schemaType = toSqlType(s)
+ // All fields are nullable because only one of them is set at a time
+ StructField(s"member$i", schemaType.dataType, nullable = true)
+ }
+
+ SchemaType(StructType(fields), nullable = false)
+ }
+
+ case other => throw new IncompatibleSchemaException(s"Unsupported type $other")
+ }
+ }
+
+ /**
+ * This function converts sparkSQL StructType into avro schema. This method uses two other
+ * converter methods in order to do the conversion.
+ */
+ def convertStructToAvro[T](
+ structType: StructType,
+ schemaBuilder: RecordBuilder[T],
+ recordNamespace: String): T = {
+ val fieldsAssembler: FieldAssembler[T] = schemaBuilder.fields()
+ structType.fields.foreach { field =>
+ val newField = fieldsAssembler.name(field.name).`type`()
+
+ if (field.nullable) {
+ convertFieldTypeToAvro(field.dataType, newField.nullable(), field.name, recordNamespace)
+ .noDefault
+ } else {
+ convertFieldTypeToAvro(field.dataType, newField, field.name, recordNamespace)
+ .noDefault
+ }
+ }
+ fieldsAssembler.endRecord()
+ }
+
+ /**
+ * Returns a converter function to convert row in avro format to GenericRow of catalyst.
+ *
+ * @param sourceAvroSchema Source schema before conversion inferred from avro file by passed in
+ * by user.
+ * @param targetSqlType Target catalyst sql type after the conversion.
+ * @return returns a converter function to convert row in avro format to GenericRow of catalyst.
+ */
+ private[avro] def createConverterToSQL(
+ sourceAvroSchema: Schema,
+ targetSqlType: DataType): AnyRef => AnyRef = {
+
+ def createConverter(avroSchema: Schema,
+ sqlType: DataType, path: List[String]): AnyRef => AnyRef = {
+ val avroType = avroSchema.getType
+ (sqlType, avroType) match {
+ // Avro strings are in Utf8, so we have to call toString on them
+ case (StringType, STRING) | (StringType, ENUM) =>
+ (item: AnyRef) => item.toString
+ // Byte arrays are reused by avro, so we have to make a copy of them.
+ case (IntegerType, INT) | (BooleanType, BOOLEAN) | (DoubleType, DOUBLE) |
+ (FloatType, FLOAT) | (LongType, LONG) =>
+ identity
+ case (TimestampType, LONG) =>
+ (item: AnyRef) => new Timestamp(item.asInstanceOf[Long])
+ case (DateType, LONG) =>
+ (item: AnyRef) => new Date(item.asInstanceOf[Long])
+ case (BinaryType, FIXED) =>
+ (item: AnyRef) => item.asInstanceOf[GenericFixed].bytes().clone()
+ case (BinaryType, BYTES) =>
+ (item: AnyRef) =>
+ val byteBuffer = item.asInstanceOf[ByteBuffer]
+ val bytes = new Array[Byte](byteBuffer.remaining)
+ byteBuffer.get(bytes)
+ bytes
+ case (struct: StructType, RECORD) =>
+ val length = struct.fields.length
+ val converters = new Array[AnyRef => AnyRef](length)
+ val avroFieldIndexes = new Array[Int](length)
+ var i = 0
+ while (i < length) {
+ val sqlField = struct.fields(i)
+ val avroField = avroSchema.getField(sqlField.name)
+ if (avroField != null) {
+ val converter = (item: AnyRef) => {
+ if (item == null) {
+ item
+ } else {
+ createConverter(avroField.schema, sqlField.dataType, path :+ sqlField.name)(item)
+ }
+ }
+ converters(i) = converter
+ avroFieldIndexes(i) = avroField.pos()
+ } else if (!sqlField.nullable) {
+ throw new IncompatibleSchemaException(
+ s"Cannot find non-nullable field ${sqlField.name} at path ${path.mkString(".")} " +
+ "in Avro schema\n" +
+ s"Source Avro schema: $sourceAvroSchema.\n" +
+ s"Target Catalyst type: $targetSqlType")
+ }
+ i += 1
+ }
+
+ (item: AnyRef) =>
+ val record = item.asInstanceOf[GenericRecord]
+ val result = new Array[Any](length)
+ var i = 0
+ while (i < converters.length) {
+ if (converters(i) != null) {
+ val converter = converters(i)
+ result(i) = converter(record.get(avroFieldIndexes(i)))
+ }
+ i += 1
+ }
+ new GenericRow(result)
+ case (arrayType: ArrayType, ARRAY) =>
+ val elementConverter = createConverter(avroSchema.getElementType, arrayType.elementType,
+ path)
+ val allowsNull = arrayType.containsNull
+ (item: AnyRef) =>
+ item.asInstanceOf[java.lang.Iterable[AnyRef]].asScala.map { element =>
+ if (element == null && !allowsNull) {
+ throw new RuntimeException(s"Array value at path ${path.mkString(".")} is not " +
+ "allowed to be null")
+ } else {
+ elementConverter(element)
+ }
+ }
+ case (mapType: MapType, MAP) if mapType.keyType == StringType =>
+ val valueConverter = createConverter(avroSchema.getValueType, mapType.valueType, path)
+ val allowsNull = mapType.valueContainsNull
+ (item: AnyRef) =>
+ item.asInstanceOf[java.util.Map[AnyRef, AnyRef]].asScala.map { case (k, v) =>
+ if (v == null && !allowsNull) {
+ throw new RuntimeException(s"Map value at path ${path.mkString(".")} is not " +
+ "allowed to be null")
+ } else {
+ (k.toString, valueConverter(v))
+ }
+ }.toMap
+ case (sqlType, UNION) =>
+ if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
+ val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL)
+ if (remainingUnionTypes.size == 1) {
+ createConverter(remainingUnionTypes.head, sqlType, path)
+ } else {
+ createConverter(Schema.createUnion(remainingUnionTypes.asJava), sqlType, path)
+ }
+ } else avroSchema.getTypes.asScala.map(_.getType) match {
+ case Seq(t1) => createConverter(avroSchema.getTypes.get(0), sqlType, path)
+ case Seq(a, b) if Set(a, b) == Set(INT, LONG) && sqlType == LongType =>
+ (item: AnyRef) =>
+ item match {
+ case l: java.lang.Long => l
+ case i: java.lang.Integer => new java.lang.Long(i.longValue())
+ }
+ case Seq(a, b) if Set(a, b) == Set(FLOAT, DOUBLE) && sqlType == DoubleType =>
+ (item: AnyRef) =>
+ item match {
+ case d: java.lang.Double => d
+ case f: java.lang.Float => new java.lang.Double(f.doubleValue())
+ }
+ case other =>
+ sqlType match {
+ case t: StructType if t.fields.length == avroSchema.getTypes.size =>
+ val fieldConverters = t.fields.zip(avroSchema.getTypes.asScala).map {
+ case (field, schema) =>
+ createConverter(schema, field.dataType, path :+ field.name)
+ }
+ (item: AnyRef) =>
+ val i = GenericData.get().resolveUnion(avroSchema, item)
+ val converted = new Array[Any](fieldConverters.length)
+ converted(i) = fieldConverters(i)(item)
+ new GenericRow(converted)
+ case _ => throw new IncompatibleSchemaException(
+ s"Cannot convert Avro schema to catalyst type because schema at path " +
+ s"${path.mkString(".")} is not compatible " +
+ s"(avroType = $other, sqlType = $sqlType). \n" +
+ s"Source Avro schema: $sourceAvroSchema.\n" +
+ s"Target Catalyst type: $targetSqlType")
+ }
+ }
+ case (left, right) =>
+ throw new IncompatibleSchemaException(
+ s"Cannot convert Avro schema to catalyst type because schema at path " +
+ s"${path.mkString(".")} is not compatible (avroType = $right, sqlType = $left). \n" +
+ s"Source Avro schema: $sourceAvroSchema.\n" +
+ s"Target Catalyst type: $targetSqlType")
+ }
+ }
+ createConverter(sourceAvroSchema, targetSqlType, List.empty[String])
+ }
+
+ /**
+ * This function is used to convert some sparkSQL type to avro type. Note that this function won't
+ * be used to construct fields of avro record (convertFieldTypeToAvro is used for that).
+ */
+ private def convertTypeToAvro[T](
+ dataType: DataType,
+ schemaBuilder: BaseTypeBuilder[T],
+ structName: String,
+ recordNamespace: String): T = {
+ dataType match {
+ case ByteType => schemaBuilder.intType()
+ case ShortType => schemaBuilder.intType()
+ case IntegerType => schemaBuilder.intType()
+ case LongType => schemaBuilder.longType()
+ case FloatType => schemaBuilder.floatType()
+ case DoubleType => schemaBuilder.doubleType()
+ case _: DecimalType => schemaBuilder.stringType()
+ case StringType => schemaBuilder.stringType()
+ case BinaryType => schemaBuilder.bytesType()
+ case BooleanType => schemaBuilder.booleanType()
+ case TimestampType => schemaBuilder.longType()
+ case DateType => schemaBuilder.longType()
+
+ case ArrayType(elementType, _) =>
+ val builder = getSchemaBuilder(dataType.asInstanceOf[ArrayType].containsNull)
+ val elementSchema = convertTypeToAvro(elementType, builder, structName, recordNamespace)
+ schemaBuilder.array().items(elementSchema)
+
+ case MapType(StringType, valueType, _) =>
+ val builder = getSchemaBuilder(dataType.asInstanceOf[MapType].valueContainsNull)
+ val valueSchema = convertTypeToAvro(valueType, builder, structName, recordNamespace)
+ schemaBuilder.map().values(valueSchema)
+
+ case structType: StructType =>
+ convertStructToAvro(
+ structType,
+ schemaBuilder.record(structName).namespace(recordNamespace),
+ recordNamespace)
+
+ case other => throw new IncompatibleSchemaException(s"Unexpected type $dataType.")
+ }
+ }
+
+ /**
+ * This function is used to construct fields of the avro record, where schema of the field is
+ * specified by avro representation of dataType. Since builders for record fields are different
+ * from those for everything else, we have to use a separate method.
+ */
+ private def convertFieldTypeToAvro[T](
+ dataType: DataType,
+ newFieldBuilder: BaseFieldTypeBuilder[T],
+ structName: String,
+ recordNamespace: String): FieldDefault[T, _] = {
+ dataType match {
+ case ByteType => newFieldBuilder.intType()
+ case ShortType => newFieldBuilder.intType()
+ case IntegerType => newFieldBuilder.intType()
+ case LongType => newFieldBuilder.longType()
+ case FloatType => newFieldBuilder.floatType()
+ case DoubleType => newFieldBuilder.doubleType()
+ case _: DecimalType => newFieldBuilder.stringType()
+ case StringType => newFieldBuilder.stringType()
+ case BinaryType => newFieldBuilder.bytesType()
+ case BooleanType => newFieldBuilder.booleanType()
+ case TimestampType => newFieldBuilder.longType()
+ case DateType => newFieldBuilder.longType()
+
+ case ArrayType(elementType, _) =>
+ val builder = getSchemaBuilder(dataType.asInstanceOf[ArrayType].containsNull)
+ val elementSchema = convertTypeToAvro(
+ elementType,
+ builder,
+ structName,
+ getNewRecordNamespace(elementType, recordNamespace, structName))
+ newFieldBuilder.array().items(elementSchema)
+
+ case MapType(StringType, valueType, _) =>
+ val builder = getSchemaBuilder(dataType.asInstanceOf[MapType].valueContainsNull)
+ val valueSchema = convertTypeToAvro(
+ valueType,
+ builder,
+ structName,
+ getNewRecordNamespace(valueType, recordNamespace, structName))
+ newFieldBuilder.map().values(valueSchema)
+
+ case structType: StructType =>
+ convertStructToAvro(
+ structType,
+ newFieldBuilder.record(structName).namespace(s"$recordNamespace.$structName"),
+ s"$recordNamespace.$structName")
+
+ case other => throw new IncompatibleSchemaException(s"Unexpected type $dataType.")
+ }
+ }
+
+ /**
+ * Returns a new namespace depending on the data type of the element.
+ * If the data type is a StructType it returns the current namespace concatenated
+ * with the element name, otherwise it returns the current namespace as it is.
+ */
+ private[avro] def getNewRecordNamespace(
+ elementDataType: DataType,
+ currentRecordNamespace: String,
+ elementName: String): String = {
+
+ elementDataType match {
+ case StructType(_) => s"$currentRecordNamespace.$elementName"
+ case _ => currentRecordNamespace
+ }
+ }
+
+ private def getSchemaBuilder(isNullable: Boolean): BaseTypeBuilder[Schema] = {
+ if (isNullable) {
+ SchemaBuilder.builder().nullable()
+ } else {
+ SchemaBuilder.builder()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
new file mode 100755
index 0000000..b3c8a66
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+package object avro {
+ /**
+ * Adds a method, `avro`, to DataFrameWriter that allows you to write avro files using
+ * the DataFileWriter
+ */
+ implicit class AvroDataFrameWriter[T](writer: DataFrameWriter[T]) {
+ def avro: String => Unit = writer.format("avro").save
+ }
+
+ /**
+ * Adds a method, `avro`, to DataFrameReader that allows you to read avro files using
+ * the DataFileReader
+ */
+ implicit class AvroDataFrameReader(reader: DataFrameReader) {
+ def avro: String => DataFrame = reader.format("avro").load
+
+ @scala.annotation.varargs
+ def avro(sources: String*): DataFrame = reader.format("avro").load(sources: _*)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/episodes.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/episodes.avro b/external/avro/src/test/resources/episodes.avro
new file mode 100644
index 0000000..58a028c
Binary files /dev/null and b/external/avro/src/test/resources/episodes.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/log4j.properties b/external/avro/src/test/resources/log4j.properties
new file mode 100644
index 0000000..c18a724
--- /dev/null
+++ b/external/avro/src/test/resources/log4j.properties
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file core/target/unit-tests.log
+log4j.rootLogger=DEBUG, CA, FA
+
+#Console Appender
+log4j.appender.CA=org.apache.log4j.ConsoleAppender
+log4j.appender.CA.layout=org.apache.log4j.PatternLayout
+log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
+log4j.appender.CA.Threshold = WARN
+
+
+#File Appender
+log4j.appender.FA=org.apache.log4j.FileAppender
+log4j.appender.FA.append=false
+log4j.appender.FA.file=target/unit-tests.log
+log4j.appender.FA.layout=org.apache.log4j.PatternLayout
+log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Set the logger level of File Appender to WARN
+log4j.appender.FA.Threshold = INFO
+
+# Some packages are noisy for no good reason.
+log4j.additivity.parquet.hadoop.ParquetRecordReader=false
+log4j.logger.parquet.hadoop.ParquetRecordReader=OFF
+
+log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false
+log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF
+
+log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false
+log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF
+
+log4j.additivity.hive.ql.metadata.Hive=false
+log4j.logger.hive.ql.metadata.Hive=OFF
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00000.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00000.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00000.avro
new file mode 100755
index 0000000..fece892
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00000.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00001.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00001.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00001.avro
new file mode 100755
index 0000000..1ca623a
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00001.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00002.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00002.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00002.avro
new file mode 100755
index 0000000..a12e945
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00002.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00003.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00003.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00003.avro
new file mode 100755
index 0000000..60c0956
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00003.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00004.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00004.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00004.avro
new file mode 100755
index 0000000..af56dfc
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00004.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00005.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00005.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00005.avro
new file mode 100755
index 0000000..87d7844
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00005.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00006.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00006.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00006.avro
new file mode 100755
index 0000000..c326fc4
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00006.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00007.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00007.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00007.avro
new file mode 100755
index 0000000..279f36c
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00007.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00008.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00008.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00008.avro
new file mode 100755
index 0000000..8d70f5d
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00008.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00009.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00009.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00009.avro
new file mode 100755
index 0000000..6839d72
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00009.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00010.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00010.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00010.avro
new file mode 100755
index 0000000..aedc7f7
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00010.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test.avro b/external/avro/src/test/resources/test.avro
new file mode 100644
index 0000000..6425e21
Binary files /dev/null and b/external/avro/src/test/resources/test.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test.avsc
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test.avsc b/external/avro/src/test/resources/test.avsc
new file mode 100644
index 0000000..d7119a0
--- /dev/null
+++ b/external/avro/src/test/resources/test.avsc
@@ -0,0 +1,53 @@
+{
+ "type" : "record",
+ "name" : "test_schema",
+ "fields" : [{
+ "name" : "string",
+ "type" : "string",
+ "doc" : "Meaningless string of characters"
+ }, {
+ "name" : "simple_map",
+ "type" : {"type": "map", "values": "int"}
+ }, {
+ "name" : "complex_map",
+ "type" : {"type": "map", "values": {"type": "map", "values": "string"}}
+ }, {
+ "name" : "union_string_null",
+ "type" : ["null", "string"]
+ }, {
+ "name" : "union_int_long_null",
+ "type" : ["int", "long", "null"]
+ }, {
+ "name" : "union_float_double",
+ "type" : ["float", "double"]
+ }, {
+ "name": "fixed3",
+ "type": {"type": "fixed", "size": 3, "name": "fixed3"}
+ }, {
+ "name": "fixed2",
+ "type": {"type": "fixed", "size": 2, "name": "fixed2"}
+ }, {
+ "name": "enum",
+ "type": { "type": "enum",
+ "name": "Suit",
+ "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
+ }
+ }, {
+ "name": "record",
+ "type": {
+ "type": "record",
+ "name": "record",
+ "aliases": ["RecordAlias"],
+ "fields" : [{
+ "name": "value_field",
+ "type": "string"
+ }]
+ }
+ }, {
+ "name": "array_of_boolean",
+ "type": {"type": "array", "items": "boolean"}
+ }, {
+ "name": "bytes",
+ "type": "bytes"
+ }]
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test.json
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test.json b/external/avro/src/test/resources/test.json
new file mode 100644
index 0000000..780189a
--- /dev/null
+++ b/external/avro/src/test/resources/test.json
@@ -0,0 +1,42 @@
+{
+ "string": "OMG SPARK IS AWESOME",
+ "simple_map": {"abc": 1, "bcd": 7},
+ "complex_map": {"key": {"a": "b", "c": "d"}},
+ "union_string_null": {"string": "abc"},
+ "union_int_long_null": {"int": 1},
+ "union_float_double": {"float": 3.1415926535},
+ "fixed3":"\u0002\u0003\u0004",
+ "fixed2":"\u0011\u0012",
+ "enum": "SPADES",
+ "record": {"value_field": "Two things are infinite: the universe and human stupidity; and I'm not sure about universe."},
+ "array_of_boolean": [true, false, false],
+ "bytes": "\u0041\u0042\u0043"
+}
+{
+ "string": "Terran is IMBA!",
+ "simple_map": {"mmm": 0, "qqq": 66},
+ "complex_map": {"key": {"1": "2", "3": "4"}},
+ "union_string_null": {"string": "123"},
+ "union_int_long_null": {"long": 66},
+ "union_float_double": {"double": 6.6666666666666},
+ "fixed3":"\u0007\u0007\u0007",
+ "fixed2":"\u0001\u0002",
+ "enum": "CLUBS",
+ "record": {"value_field": "Life did not intend to make us perfect. Whoever is perfect belongs in a museum."},
+ "array_of_boolean": [],
+ "bytes": ""
+}
+{
+ "string": "The cake is a LIE!",
+ "simple_map": {},
+ "complex_map": {"key": {}},
+ "union_string_null": {"null": null},
+ "union_int_long_null": {"null": null},
+ "union_float_double": {"double": 0},
+ "fixed3":"\u0011\u0022\u0009",
+ "fixed2":"\u0010\u0090",
+ "enum": "DIAMONDS",
+ "record": {"value_field": "TEST_STR123"},
+ "array_of_boolean": [false],
+ "bytes": "\u0053"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org