You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/07/12 20:55:30 UTC
[1/2] spark git commit: [SPARK-24768][SQL] Have a built-in AVRO data
source implementation
Repository: spark
Updated Branches:
refs/heads/master 1055c94cd -> 395860a98
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
new file mode 100644
index 0000000..c6c1e40
--- /dev/null
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -0,0 +1,812 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import java.io._
+import java.nio.file.Files
+import java.sql.{Date, Timestamp}
+import java.util.{TimeZone, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.avro.Schema
+import org.apache.avro.Schema.{Field, Type}
+import org.apache.avro.file.DataFileWriter
+import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
+import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
+import org.apache.commons.io.FileUtils
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql._
+import org.apache.spark.sql.avro.SchemaConverters.IncompatibleSchemaException
+import org.apache.spark.sql.types._
+
+class AvroSuite extends SparkFunSuite {
+ val episodesFile = "src/test/resources/episodes.avro"
+ val testFile = "src/test/resources/test.avro"
+
+ private var spark: SparkSession = _
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ spark = SparkSession.builder()
+ .master("local[2]")
+ .appName("AvroSuite")
+ .config("spark.sql.files.maxPartitionBytes", 1024)
+ .getOrCreate()
+ }
+
+ override protected def afterAll(): Unit = {
+ try {
+ spark.sparkContext.stop()
+ } finally {
+ super.afterAll()
+ }
+ }
+
+ test("reading from multiple paths") {
+ val df = spark.read.avro(episodesFile, episodesFile)
+ assert(df.count == 16)
+ }
+
+ test("reading and writing partitioned data") {
+ val df = spark.read.avro(episodesFile)
+ val fields = List("title", "air_date", "doctor")
+ for (field <- fields) {
+ TestUtils.withTempDir { dir =>
+ val outputDir = s"$dir/${UUID.randomUUID}"
+ df.write.partitionBy(field).avro(outputDir)
+ val input = spark.read.avro(outputDir)
+ // makes sure that no fields got dropped.
+ // We convert Rows to Seqs in order to work around SPARK-10325
+ assert(input.select(field).collect().map(_.toSeq).toSet ===
+ df.select(field).collect().map(_.toSeq).toSet)
+ }
+ }
+ }
+
+ test("request no fields") {
+ val df = spark.read.avro(episodesFile)
+ df.registerTempTable("avro_table")
+ assert(spark.sql("select count(*) from avro_table").collect().head === Row(8))
+ }
+
+ test("convert formats") {
+ TestUtils.withTempDir { dir =>
+ val df = spark.read.avro(episodesFile)
+ df.write.parquet(dir.getCanonicalPath)
+ assert(spark.read.parquet(dir.getCanonicalPath).count() === df.count)
+ }
+ }
+
+ test("rearrange internal schema") {
+ TestUtils.withTempDir { dir =>
+ val df = spark.read.avro(episodesFile)
+ df.select("doctor", "title").write.avro(dir.getCanonicalPath)
+ }
+ }
+
+ test("test NULL avro type") {
+ TestUtils.withTempDir { dir =>
+ val fields = Seq(new Field("null", Schema.create(Type.NULL), "doc", null)).asJava
+ val schema = Schema.createRecord("name", "docs", "namespace", false)
+ schema.setFields(fields)
+ val datumWriter = new GenericDatumWriter[GenericRecord](schema)
+ val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
+ dataFileWriter.create(schema, new File(s"$dir.avro"))
+ val avroRec = new GenericData.Record(schema)
+ avroRec.put("null", null)
+ dataFileWriter.append(avroRec)
+ dataFileWriter.flush()
+ dataFileWriter.close()
+
+ intercept[IncompatibleSchemaException] {
+ spark.read.avro(s"$dir.avro")
+ }
+ }
+ }
+
+ test("union(int, long) is read as long") {
+ TestUtils.withTempDir { dir =>
+ val avroSchema: Schema = {
+ val union =
+ Schema.createUnion(List(Schema.create(Type.INT), Schema.create(Type.LONG)).asJava)
+ val fields = Seq(new Field("field1", union, "doc", null)).asJava
+ val schema = Schema.createRecord("name", "docs", "namespace", false)
+ schema.setFields(fields)
+ schema
+ }
+
+ val datumWriter = new GenericDatumWriter[GenericRecord](avroSchema)
+ val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
+ dataFileWriter.create(avroSchema, new File(s"$dir.avro"))
+ val rec1 = new GenericData.Record(avroSchema)
+ rec1.put("field1", 1.toLong)
+ dataFileWriter.append(rec1)
+ val rec2 = new GenericData.Record(avroSchema)
+ rec2.put("field1", 2)
+ dataFileWriter.append(rec2)
+ dataFileWriter.flush()
+ dataFileWriter.close()
+ val df = spark.read.avro(s"$dir.avro")
+ assert(df.schema.fields === Seq(StructField("field1", LongType, nullable = true)))
+ assert(df.collect().toSet == Set(Row(1L), Row(2L)))
+ }
+ }
+
+ test("union(float, double) is read as double") {
+ TestUtils.withTempDir { dir =>
+ val avroSchema: Schema = {
+ val union =
+ Schema.createUnion(List(Schema.create(Type.FLOAT), Schema.create(Type.DOUBLE)).asJava)
+ val fields = Seq(new Field("field1", union, "doc", null)).asJava
+ val schema = Schema.createRecord("name", "docs", "namespace", false)
+ schema.setFields(fields)
+ schema
+ }
+
+ val datumWriter = new GenericDatumWriter[GenericRecord](avroSchema)
+ val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
+ dataFileWriter.create(avroSchema, new File(s"$dir.avro"))
+ val rec1 = new GenericData.Record(avroSchema)
+ rec1.put("field1", 1.toFloat)
+ dataFileWriter.append(rec1)
+ val rec2 = new GenericData.Record(avroSchema)
+ rec2.put("field1", 2.toDouble)
+ dataFileWriter.append(rec2)
+ dataFileWriter.flush()
+ dataFileWriter.close()
+ val df = spark.read.avro(s"$dir.avro")
+ assert(df.schema.fields === Seq(StructField("field1", DoubleType, nullable = true)))
+ assert(df.collect().toSet == Set(Row(1.toDouble), Row(2.toDouble)))
+ }
+ }
+
+ test("union(float, double, null) is read as nullable double") {
+ TestUtils.withTempDir { dir =>
+ val avroSchema: Schema = {
+ val union = Schema.createUnion(
+ List(Schema.create(Type.FLOAT),
+ Schema.create(Type.DOUBLE),
+ Schema.create(Type.NULL)
+ ).asJava
+ )
+ val fields = Seq(new Field("field1", union, "doc", null)).asJava
+ val schema = Schema.createRecord("name", "docs", "namespace", false)
+ schema.setFields(fields)
+ schema
+ }
+
+ val datumWriter = new GenericDatumWriter[GenericRecord](avroSchema)
+ val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
+ dataFileWriter.create(avroSchema, new File(s"$dir.avro"))
+ val rec1 = new GenericData.Record(avroSchema)
+ rec1.put("field1", 1.toFloat)
+ dataFileWriter.append(rec1)
+ val rec2 = new GenericData.Record(avroSchema)
+ rec2.put("field1", null)
+ dataFileWriter.append(rec2)
+ dataFileWriter.flush()
+ dataFileWriter.close()
+ val df = spark.read.avro(s"$dir.avro")
+ assert(df.schema.fields === Seq(StructField("field1", DoubleType, nullable = true)))
+ assert(df.collect().toSet == Set(Row(1.toDouble), Row(null)))
+ }
+ }
+
+ test("Union of a single type") {
+ TestUtils.withTempDir { dir =>
+ val UnionOfOne = Schema.createUnion(List(Schema.create(Type.INT)).asJava)
+ val fields = Seq(new Field("field1", UnionOfOne, "doc", null)).asJava
+ val schema = Schema.createRecord("name", "docs", "namespace", false)
+ schema.setFields(fields)
+
+ val datumWriter = new GenericDatumWriter[GenericRecord](schema)
+ val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
+ dataFileWriter.create(schema, new File(s"$dir.avro"))
+ val avroRec = new GenericData.Record(schema)
+
+ avroRec.put("field1", 8)
+
+ dataFileWriter.append(avroRec)
+ dataFileWriter.flush()
+ dataFileWriter.close()
+
+ val df = spark.read.avro(s"$dir.avro")
+ assert(df.first() == Row(8))
+ }
+ }
+
+ test("Complex Union Type") {
+ TestUtils.withTempDir { dir =>
+ val fixedSchema = Schema.createFixed("fixed_name", "doc", "namespace", 4)
+ val enumSchema = Schema.createEnum("enum_name", "doc", "namespace", List("e1", "e2").asJava)
+ val complexUnionType = Schema.createUnion(
+ List(Schema.create(Type.INT), Schema.create(Type.STRING), fixedSchema, enumSchema).asJava)
+ val fields = Seq(
+ new Field("field1", complexUnionType, "doc", null),
+ new Field("field2", complexUnionType, "doc", null),
+ new Field("field3", complexUnionType, "doc", null),
+ new Field("field4", complexUnionType, "doc", null)
+ ).asJava
+ val schema = Schema.createRecord("name", "docs", "namespace", false)
+ schema.setFields(fields)
+ val datumWriter = new GenericDatumWriter[GenericRecord](schema)
+ val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
+ dataFileWriter.create(schema, new File(s"$dir.avro"))
+ val avroRec = new GenericData.Record(schema)
+ val field1 = 1234
+ val field2 = "Hope that was not load bearing"
+ val field3 = Array[Byte](1, 2, 3, 4)
+ val field4 = "e2"
+ avroRec.put("field1", field1)
+ avroRec.put("field2", field2)
+ avroRec.put("field3", new Fixed(fixedSchema, field3))
+ avroRec.put("field4", new EnumSymbol(enumSchema, field4))
+ dataFileWriter.append(avroRec)
+ dataFileWriter.flush()
+ dataFileWriter.close()
+
+ val df = spark.sqlContext.read.avro(s"$dir.avro")
+ assertResult(field1)(df.selectExpr("field1.member0").first().get(0))
+ assertResult(field2)(df.selectExpr("field2.member1").first().get(0))
+ assertResult(field3)(df.selectExpr("field3.member2").first().get(0))
+ assertResult(field4)(df.selectExpr("field4.member3").first().get(0))
+ }
+ }
+
+ test("Lots of nulls") {
+ TestUtils.withTempDir { dir =>
+ val schema = StructType(Seq(
+ StructField("binary", BinaryType, true),
+ StructField("timestamp", TimestampType, true),
+ StructField("array", ArrayType(ShortType), true),
+ StructField("map", MapType(StringType, StringType), true),
+ StructField("struct", StructType(Seq(StructField("int", IntegerType, true))))))
+ val rdd = spark.sparkContext.parallelize(Seq[Row](
+ Row(null, new Timestamp(1), Array[Short](1, 2, 3), null, null),
+ Row(null, null, null, null, null),
+ Row(null, null, null, null, null),
+ Row(null, null, null, null, null)))
+ val df = spark.createDataFrame(rdd, schema)
+ df.write.avro(dir.toString)
+ assert(spark.read.avro(dir.toString).count == rdd.count)
+ }
+ }
+
+ test("Struct field type") {
+ TestUtils.withTempDir { dir =>
+ val schema = StructType(Seq(
+ StructField("float", FloatType, true),
+ StructField("short", ShortType, true),
+ StructField("byte", ByteType, true),
+ StructField("boolean", BooleanType, true)
+ ))
+ val rdd = spark.sparkContext.parallelize(Seq(
+ Row(1f, 1.toShort, 1.toByte, true),
+ Row(2f, 2.toShort, 2.toByte, true),
+ Row(3f, 3.toShort, 3.toByte, true)
+ ))
+ val df = spark.createDataFrame(rdd, schema)
+ df.write.avro(dir.toString)
+ assert(spark.read.avro(dir.toString).count == rdd.count)
+ }
+ }
+
+ test("Date field type") {
+ TestUtils.withTempDir { dir =>
+ val schema = StructType(Seq(
+ StructField("float", FloatType, true),
+ StructField("date", DateType, true)
+ ))
+ TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
+ val rdd = spark.sparkContext.parallelize(Seq(
+ Row(1f, null),
+ Row(2f, new Date(1451948400000L)),
+ Row(3f, new Date(1460066400500L))
+ ))
+ val df = spark.createDataFrame(rdd, schema)
+ df.write.avro(dir.toString)
+ assert(spark.read.avro(dir.toString).count == rdd.count)
+ assert(spark.read.avro(dir.toString).select("date").collect().map(_(0)).toSet ==
+ Array(null, 1451865600000L, 1459987200000L).toSet)
+ }
+ }
+
+ test("Array data types") {
+ TestUtils.withTempDir { dir =>
+ val testSchema = StructType(Seq(
+ StructField("byte_array", ArrayType(ByteType), true),
+ StructField("short_array", ArrayType(ShortType), true),
+ StructField("float_array", ArrayType(FloatType), true),
+ StructField("bool_array", ArrayType(BooleanType), true),
+ StructField("long_array", ArrayType(LongType), true),
+ StructField("double_array", ArrayType(DoubleType), true),
+ StructField("decimal_array", ArrayType(DecimalType(10, 0)), true),
+ StructField("bin_array", ArrayType(BinaryType), true),
+ StructField("timestamp_array", ArrayType(TimestampType), true),
+ StructField("array_array", ArrayType(ArrayType(StringType), true), true),
+ StructField("struct_array", ArrayType(
+ StructType(Seq(StructField("name", StringType, true)))))))
+
+ val arrayOfByte = new Array[Byte](4)
+ for (i <- arrayOfByte.indices) {
+ arrayOfByte(i) = i.toByte
+ }
+
+ val rdd = spark.sparkContext.parallelize(Seq(
+ Row(arrayOfByte, Array[Short](1, 2, 3, 4), Array[Float](1f, 2f, 3f, 4f),
+ Array[Boolean](true, false, true, false), Array[Long](1L, 2L), Array[Double](1.0, 2.0),
+ Array[BigDecimal](BigDecimal.valueOf(3)), Array[Array[Byte]](arrayOfByte, arrayOfByte),
+ Array[Timestamp](new Timestamp(0)),
+ Array[Array[String]](Array[String]("CSH, tearing down the walls that divide us", "-jd")),
+ Array[Row](Row("Bobby G. can't swim")))))
+ val df = spark.createDataFrame(rdd, testSchema)
+ df.write.avro(dir.toString)
+ assert(spark.read.avro(dir.toString).count == rdd.count)
+ }
+ }
+
+ test("write with compression") {
+ TestUtils.withTempDir { dir =>
+ val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec"
+ val AVRO_DEFLATE_LEVEL = "spark.sql.avro.deflate.level"
+ val uncompressDir = s"$dir/uncompress"
+ val deflateDir = s"$dir/deflate"
+ val snappyDir = s"$dir/snappy"
+ val fakeDir = s"$dir/fake"
+
+ val df = spark.read.avro(testFile)
+ spark.conf.set(AVRO_COMPRESSION_CODEC, "uncompressed")
+ df.write.avro(uncompressDir)
+ spark.conf.set(AVRO_COMPRESSION_CODEC, "deflate")
+ spark.conf.set(AVRO_DEFLATE_LEVEL, "9")
+ df.write.avro(deflateDir)
+ spark.conf.set(AVRO_COMPRESSION_CODEC, "snappy")
+ df.write.avro(snappyDir)
+
+ val uncompressSize = FileUtils.sizeOfDirectory(new File(uncompressDir))
+ val deflateSize = FileUtils.sizeOfDirectory(new File(deflateDir))
+ val snappySize = FileUtils.sizeOfDirectory(new File(snappyDir))
+
+ assert(uncompressSize > deflateSize)
+ assert(snappySize > deflateSize)
+ }
+ }
+
+ test("dsl test") {
+ val results = spark.read.avro(episodesFile).select("title").collect()
+ assert(results.length === 8)
+ }
+
+ test("support of various data types") {
+ // This test uses data from test.avro. You can see the data and the schema of this file in
+ // test.json and test.avsc
+ val all = spark.read.avro(testFile).collect()
+ assert(all.length == 3)
+
+ val str = spark.read.avro(testFile).select("string").collect()
+ assert(str.map(_(0)).toSet.contains("Terran is IMBA!"))
+
+ val simple_map = spark.read.avro(testFile).select("simple_map").collect()
+ assert(simple_map(0)(0).getClass.toString.contains("Map"))
+ assert(simple_map.map(_(0).asInstanceOf[Map[String, Some[Int]]].size).toSet == Set(2, 0))
+
+ val union0 = spark.read.avro(testFile).select("union_string_null").collect()
+ assert(union0.map(_(0)).toSet == Set("abc", "123", null))
+
+ val union1 = spark.read.avro(testFile).select("union_int_long_null").collect()
+ assert(union1.map(_(0)).toSet == Set(66, 1, null))
+
+ val union2 = spark.read.avro(testFile).select("union_float_double").collect()
+ assert(
+ union2
+ .map(x => new java.lang.Double(x(0).toString))
+ .exists(p => Math.abs(p - Math.PI) < 0.001))
+
+ val fixed = spark.read.avro(testFile).select("fixed3").collect()
+ assert(fixed.map(_(0).asInstanceOf[Array[Byte]]).exists(p => p(1) == 3))
+
+ val enum = spark.read.avro(testFile).select("enum").collect()
+ assert(enum.map(_(0)).toSet == Set("SPADES", "CLUBS", "DIAMONDS"))
+
+ val record = spark.read.avro(testFile).select("record").collect()
+ assert(record(0)(0).getClass.toString.contains("Row"))
+ assert(record.map(_(0).asInstanceOf[Row](0)).contains("TEST_STR123"))
+
+ val array_of_boolean = spark.read.avro(testFile).select("array_of_boolean").collect()
+ assert(array_of_boolean.map(_(0).asInstanceOf[Seq[Boolean]].size).toSet == Set(3, 1, 0))
+
+ val bytes = spark.read.avro(testFile).select("bytes").collect()
+ assert(bytes.map(_(0).asInstanceOf[Array[Byte]].length).toSet == Set(3, 1, 0))
+ }
+
+ test("sql test") {
+ spark.sql(
+ s"""
+ |CREATE TEMPORARY TABLE avroTable
+ |USING avro
+ |OPTIONS (path "$episodesFile")
+ """.stripMargin.replaceAll("\n", " "))
+
+ assert(spark.sql("SELECT * FROM avroTable").collect().length === 8)
+ }
+
+ test("conversion to avro and back") {
+ // Note that test.avro includes a variety of types, some of which are nullable. We expect to
+ // get the same values back.
+ TestUtils.withTempDir { dir =>
+ val avroDir = s"$dir/avro"
+ spark.read.avro(testFile).write.avro(avroDir)
+ TestUtils.checkReloadMatchesSaved(spark, testFile, avroDir)
+ }
+ }
+
+ test("conversion to avro and back with namespace") {
+ // Note that test.avro includes a variety of types, some of which are nullable. We expect to
+ // get the same values back.
+ TestUtils.withTempDir { tempDir =>
+ val name = "AvroTest"
+ val namespace = "com.databricks.spark.avro"
+ val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
+
+ val avroDir = tempDir + "/namedAvro"
+ spark.read.avro(testFile).write.options(parameters).avro(avroDir)
+ TestUtils.checkReloadMatchesSaved(spark, testFile, avroDir)
+
+ // Look at raw file and make sure has namespace info
+ val rawSaved = spark.sparkContext.textFile(avroDir)
+ val schema = rawSaved.collect().mkString("")
+ assert(schema.contains(name))
+ assert(schema.contains(namespace))
+ }
+ }
+
+ test("converting some specific sparkSQL types to avro") {
+ TestUtils.withTempDir { tempDir =>
+ val testSchema = StructType(Seq(
+ StructField("Name", StringType, false),
+ StructField("Length", IntegerType, true),
+ StructField("Time", TimestampType, false),
+ StructField("Decimal", DecimalType(10, 2), true),
+ StructField("Binary", BinaryType, false)))
+
+ val arrayOfByte = new Array[Byte](4)
+ for (i <- arrayOfByte.indices) {
+ arrayOfByte(i) = i.toByte
+ }
+ val cityRDD = spark.sparkContext.parallelize(Seq(
+ Row("San Francisco", 12, new Timestamp(666), null, arrayOfByte),
+ Row("Palo Alto", null, new Timestamp(777), null, arrayOfByte),
+ Row("Munich", 8, new Timestamp(42), Decimal(3.14), arrayOfByte)))
+ val cityDataFrame = spark.createDataFrame(cityRDD, testSchema)
+
+ val avroDir = tempDir + "/avro"
+ cityDataFrame.write.avro(avroDir)
+ assert(spark.read.avro(avroDir).collect().length == 3)
+
+ // TimesStamps are converted to longs
+ val times = spark.read.avro(avroDir).select("Time").collect()
+ assert(times.map(_(0)).toSet == Set(666, 777, 42))
+
+ // DecimalType should be converted to string
+ val decimals = spark.read.avro(avroDir).select("Decimal").collect()
+ assert(decimals.map(_(0)).contains("3.14"))
+
+ // There should be a null entry
+ val length = spark.read.avro(avroDir).select("Length").collect()
+ assert(length.map(_(0)).contains(null))
+
+ val binary = spark.read.avro(avroDir).select("Binary").collect()
+ for (i <- arrayOfByte.indices) {
+ assert(binary(1)(0).asInstanceOf[Array[Byte]](i) == arrayOfByte(i))
+ }
+ }
+ }
+
+ test("correctly read long as date/timestamp type") {
+ TestUtils.withTempDir { tempDir =>
+ val sparkSession = spark
+ import sparkSession.implicits._
+
+ val currentTime = new Timestamp(System.currentTimeMillis())
+ val currentDate = new Date(System.currentTimeMillis())
+ val schema = StructType(Seq(
+ StructField("_1", DateType, false), StructField("_2", TimestampType, false)))
+ val writeDs = Seq((currentDate, currentTime)).toDS
+
+ val avroDir = tempDir + "/avro"
+ writeDs.write.avro(avroDir)
+ assert(spark.read.avro(avroDir).collect().length == 1)
+
+ val readDs = spark.read.schema(schema).avro(avroDir).as[(Date, Timestamp)]
+
+ assert(readDs.collect().sameElements(writeDs.collect()))
+ }
+ }
+
+ test("support of globbed paths") {
+ val e1 = spark.read.avro("*/test/resources/episodes.avro").collect()
+ assert(e1.length == 8)
+
+ val e2 = spark.read.avro("src/*/*/episodes.avro").collect()
+ assert(e2.length == 8)
+ }
+
+ test("does not coerce null date/timestamp value to 0 epoch.") {
+ TestUtils.withTempDir { tempDir =>
+ val sparkSession = spark
+ import sparkSession.implicits._
+
+ val nullTime: Timestamp = null
+ val nullDate: Date = null
+ val schema = StructType(Seq(
+ StructField("_1", DateType, nullable = true),
+ StructField("_2", TimestampType, nullable = true))
+ )
+ val writeDs = Seq((nullDate, nullTime)).toDS
+
+ val avroDir = tempDir + "/avro"
+ writeDs.write.avro(avroDir)
+ val readValues = spark.read.schema(schema).avro(avroDir).as[(Date, Timestamp)].collect
+
+ assert(readValues.size == 1)
+ assert(readValues.head == ((nullDate, nullTime)))
+ }
+ }
+
+ test("support user provided avro schema") {
+ val avroSchema =
+ """
+ |{
+ | "type" : "record",
+ | "name" : "test_schema",
+ | "fields" : [{
+ | "name" : "string",
+ | "type" : "string",
+ | "doc" : "Meaningless string of characters"
+ | }]
+ |}
+ """.stripMargin
+ val result = spark.read.option(AvroFileFormat.AvroSchema, avroSchema).avro(testFile).collect()
+ val expected = spark.read.avro(testFile).select("string").collect()
+ assert(result.sameElements(expected))
+ }
+
+ test("support user provided avro schema with defaults for missing fields") {
+ val avroSchema =
+ """
+ |{
+ | "type" : "record",
+ | "name" : "test_schema",
+ | "fields" : [{
+ | "name" : "missingField",
+ | "type" : "string",
+ | "default" : "foo"
+ | }]
+ |}
+ """.stripMargin
+ val result = spark.read.option(AvroFileFormat.AvroSchema, avroSchema)
+ .avro(testFile).select("missingField").first
+ assert(result === Row("foo"))
+ }
+
+ test("reading from invalid path throws exception") {
+
+ // Directory given has no avro files
+ intercept[AnalysisException] {
+ TestUtils.withTempDir(dir => spark.read.avro(dir.getCanonicalPath))
+ }
+
+ intercept[AnalysisException] {
+ spark.read.avro("very/invalid/path/123.avro")
+ }
+
+ // In case of globbed path that can't be matched to anything, another exception is thrown (and
+ // exception message is helpful)
+ intercept[AnalysisException] {
+ spark.read.avro("*/*/*/*/*/*/*/something.avro")
+ }
+
+ intercept[FileNotFoundException] {
+ TestUtils.withTempDir { dir =>
+ FileUtils.touch(new File(dir, "test"))
+ spark.read.avro(dir.toString)
+ }
+ }
+
+ }
+
+ test("SQL test insert overwrite") {
+ TestUtils.withTempDir { tempDir =>
+ val tempEmptyDir = s"$tempDir/sqlOverwrite"
+ // Create a temp directory for table that will be overwritten
+ new File(tempEmptyDir).mkdirs()
+ spark.sql(
+ s"""
+ |CREATE TEMPORARY TABLE episodes
+ |USING avro
+ |OPTIONS (path "$episodesFile")
+ """.stripMargin.replaceAll("\n", " "))
+ spark.sql(
+ s"""
+ |CREATE TEMPORARY TABLE episodesEmpty
+ |(name string, air_date string, doctor int)
+ |USING avro
+ |OPTIONS (path "$tempEmptyDir")
+ """.stripMargin.replaceAll("\n", " "))
+
+ assert(spark.sql("SELECT * FROM episodes").collect().length === 8)
+ assert(spark.sql("SELECT * FROM episodesEmpty").collect().isEmpty)
+
+ spark.sql(
+ s"""
+ |INSERT OVERWRITE TABLE episodesEmpty
+ |SELECT * FROM episodes
+ """.stripMargin.replaceAll("\n", " "))
+ assert(spark.sql("SELECT * FROM episodesEmpty").collect().length == 8)
+ }
+ }
+
+ test("test save and load") {
+ // Test if load works as expected
+ TestUtils.withTempDir { tempDir =>
+ val df = spark.read.avro(episodesFile)
+ assert(df.count == 8)
+
+ val tempSaveDir = s"$tempDir/save/"
+
+ df.write.avro(tempSaveDir)
+ val newDf = spark.read.avro(tempSaveDir)
+ assert(newDf.count == 8)
+ }
+ }
+
+ test("test load with non-Avro file") {
+ // Test if load works as expected
+ TestUtils.withTempDir { tempDir =>
+ val df = spark.read.avro(episodesFile)
+ assert(df.count == 8)
+
+ val tempSaveDir = s"$tempDir/save/"
+ df.write.avro(tempSaveDir)
+
+ Files.createFile(new File(tempSaveDir, "non-avro").toPath)
+
+ val newDf = spark
+ .read
+ .option(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, "true")
+ .avro(tempSaveDir)
+
+ assert(newDf.count == 8)
+ }
+ }
+
+ test("read avro with user defined schema: read partial columns") {
+ val partialColumns = StructType(Seq(
+ StructField("string", StringType, false),
+ StructField("simple_map", MapType(StringType, IntegerType), false),
+ StructField("complex_map", MapType(StringType, MapType(StringType, StringType)), false),
+ StructField("union_string_null", StringType, true),
+ StructField("union_int_long_null", LongType, true),
+ StructField("fixed3", BinaryType, true),
+ StructField("fixed2", BinaryType, true),
+ StructField("enum", StringType, false),
+ StructField("record", StructType(Seq(StructField("value_field", StringType, false))), false),
+ StructField("array_of_boolean", ArrayType(BooleanType), false),
+ StructField("bytes", BinaryType, true)))
+ val withSchema = spark.read.schema(partialColumns).avro(testFile).collect()
+ val withOutSchema = spark
+ .read
+ .avro(testFile)
+ .select("string", "simple_map", "complex_map", "union_string_null", "union_int_long_null",
+ "fixed3", "fixed2", "enum", "record", "array_of_boolean", "bytes")
+ .collect()
+ assert(withSchema.sameElements(withOutSchema))
+ }
+
+ test("read avro with user defined schema: read non-exist columns") {
+ val schema =
+ StructType(
+ Seq(
+ StructField("non_exist_string", StringType, true),
+ StructField(
+ "record",
+ StructType(Seq(
+ StructField("non_exist_field", StringType, false),
+ StructField("non_exist_field2", StringType, false))),
+ false)))
+ val withEmptyColumn = spark.read.schema(schema).avro(testFile).collect()
+
+ assert(withEmptyColumn.forall(_ == Row(null: String, Row(null: String, null: String))))
+ }
+
+ test("read avro file partitioned") {
+ TestUtils.withTempDir { dir =>
+ val sparkSession = spark
+ import sparkSession.implicits._
+ val df = (0 to 1024 * 3).toDS.map(i => s"record${i}").toDF("records")
+ val outputDir = s"$dir/${UUID.randomUUID}"
+ df.write.avro(outputDir)
+ val input = spark.read.avro(outputDir)
+ assert(input.collect.toSet.size === 1024 * 3 + 1)
+ assert(input.rdd.partitions.size > 2)
+ }
+ }
+
+ case class NestedBottom(id: Int, data: String)
+
+ case class NestedMiddle(id: Int, data: NestedBottom)
+
+ case class NestedTop(id: Int, data: NestedMiddle)
+
+ test("saving avro that has nested records with the same name") {
+ TestUtils.withTempDir { tempDir =>
+ // Save avro file on output folder path
+ val writeDf = spark.createDataFrame(List(NestedTop(1, NestedMiddle(2, NestedBottom(3, "1")))))
+ val outputFolder = s"$tempDir/duplicate_names/"
+ writeDf.write.avro(outputFolder)
+ // Read avro file saved on the last step
+ val readDf = spark.read.avro(outputFolder)
+ // Check if the written DataFrame is equals than read DataFrame
+ assert(readDf.collect().sameElements(writeDf.collect()))
+ }
+ }
+
+ case class NestedMiddleArray(id: Int, data: Array[NestedBottom])
+
+ case class NestedTopArray(id: Int, data: NestedMiddleArray)
+
+ test("saving avro that has nested records with the same name inside an array") {
+ TestUtils.withTempDir { tempDir =>
+ // Save avro file on output folder path
+ val writeDf = spark.createDataFrame(
+ List(NestedTopArray(1, NestedMiddleArray(2, Array(
+ NestedBottom(3, "1"), NestedBottom(4, "2")
+ ))))
+ )
+ val outputFolder = s"$tempDir/duplicate_names_array/"
+ writeDf.write.avro(outputFolder)
+ // Read avro file saved on the last step
+ val readDf = spark.read.avro(outputFolder)
+ // Check if the written DataFrame is equals than read DataFrame
+ assert(readDf.collect().sameElements(writeDf.collect()))
+ }
+ }
+
+ case class NestedMiddleMap(id: Int, data: Map[String, NestedBottom])
+
+ case class NestedTopMap(id: Int, data: NestedMiddleMap)
+
+ test("saving avro that has nested records with the same name inside a map") {
+ TestUtils.withTempDir { tempDir =>
+ // Save avro file on output folder path
+ val writeDf = spark.createDataFrame(
+ List(NestedTopMap(1, NestedMiddleMap(2, Map(
+ "1" -> NestedBottom(3, "1"), "2" -> NestedBottom(4, "2")
+ ))))
+ )
+ val outputFolder = s"$tempDir/duplicate_names_map/"
+ writeDf.write.avro(outputFolder)
+ // Read avro file saved on the last step
+ val readDf = spark.read.avro(outputFolder)
+ // Check if the written DataFrame is equals than read DataFrame
+ assert(readDf.collect().sameElements(writeDf.collect()))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableConfigurationSuite.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableConfigurationSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableConfigurationSuite.scala
new file mode 100755
index 0000000..a0f8851
--- /dev/null
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/SerializableConfigurationSuite.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerInstance}
+
+class SerializableConfigurationSuite extends SparkFunSuite {
+
+ private def testSerialization(serializer: SerializerInstance): Unit = {
+ import AvroFileFormat.SerializableConfiguration
+ val conf = new SerializableConfiguration(new Configuration())
+
+ val serialized = serializer.serialize(conf)
+
+ serializer.deserialize[Any](serialized) match {
+ case c: SerializableConfiguration =>
+ assert(c.log != null, "log was null")
+ assert(c.value != null, "value was null")
+ case other => fail(
+ s"Expecting ${classOf[SerializableConfiguration]}, but got ${other.getClass}.")
+ }
+ }
+
+ test("serialization with JavaSerializer") {
+ testSerialization(new JavaSerializer(new SparkConf()).newInstance())
+ }
+
+ test("serialization with KryoSerializer") {
+ testSerialization(new KryoSerializer(new SparkConf()).newInstance())
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/scala/org/apache/spark/sql/avro/TestUtils.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/TestUtils.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/TestUtils.scala
new file mode 100755
index 0000000..4ae9b14
--- /dev/null
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/TestUtils.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import java.io.{File, IOException}
+import java.nio.ByteBuffer
+
+import scala.collection.immutable.HashSet
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import com.google.common.io.Files
+import java.util
+
+import org.apache.spark.sql.SparkSession
+
+private[avro] object TestUtils {
+
+ /**
+ * This function checks that all records in a file match the original
+ * record.
+ */
+ def checkReloadMatchesSaved(spark: SparkSession, testFile: String, avroDir: String): Unit = {
+
+ def convertToString(elem: Any): String = {
+ elem match {
+ case null => "NULL" // HashSets can't have null in them, so we use a string instead
+ case arrayBuf: ArrayBuffer[_] =>
+ arrayBuf.asInstanceOf[ArrayBuffer[Any]].toArray.deep.mkString(" ")
+ case arrayByte: Array[Byte] => arrayByte.deep.mkString(" ")
+ case other => other.toString
+ }
+ }
+
+ val originalEntries = spark.read.avro(testFile).collect()
+ val newEntries = spark.read.avro(avroDir).collect()
+
+ assert(originalEntries.length == newEntries.length)
+
+ val origEntrySet = Array.fill(originalEntries(0).size)(new HashSet[Any]())
+ for (origEntry <- originalEntries) {
+ var idx = 0
+ for (origElement <- origEntry.toSeq) {
+ origEntrySet(idx) += convertToString(origElement)
+ idx += 1
+ }
+ }
+
+ for (newEntry <- newEntries) {
+ var idx = 0
+ for (newElement <- newEntry.toSeq) {
+ assert(origEntrySet(idx).contains(convertToString(newElement)))
+ idx += 1
+ }
+ }
+ }
+
+ def withTempDir(f: File => Unit): Unit = {
+ val dir = Files.createTempDir()
+ dir.delete()
+ try f(dir) finally deleteRecursively(dir)
+ }
+
+ /**
+ * This function deletes a file or a directory with everything that's in it. This function is
+ * copied from Spark with minor modifications made to it. See original source at:
+ * github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/Utils.scala
+ */
+
+ def deleteRecursively(file: File) {
+ def listFilesSafely(file: File): Seq[File] = {
+ if (file.exists()) {
+ val files = file.listFiles()
+ if (files == null) {
+ throw new IOException("Failed to list files for dir: " + file)
+ }
+ files
+ } else {
+ List()
+ }
+ }
+
+ if (file != null) {
+ try {
+ if (file.isDirectory) {
+ var savedIOException: IOException = null
+ for (child <- listFilesSafely(file)) {
+ try {
+ deleteRecursively(child)
+ } catch {
+ // In case of multiple exceptions, only last one will be thrown
+ case ioe: IOException => savedIOException = ioe
+ }
+ }
+ if (savedIOException != null) {
+ throw savedIOException
+ }
+ }
+ } finally {
+ if (!file.delete()) {
+ // Delete can also fail if the file simply did not exist
+ if (file.exists()) {
+ throw new IOException("Failed to delete: " + file.getAbsolutePath)
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * This function generates a random map(string, int) of a given size.
+ */
+ private[avro] def generateRandomMap(rand: Random, size: Int): java.util.Map[String, Int] = {
+ val jMap = new util.HashMap[String, Int]()
+ for (i <- 0 until size) {
+ jMap.put(rand.nextString(5), i)
+ }
+ jMap
+ }
+
+ /**
+ * This function generates a random array of booleans of a given size.
+ */
+ private[avro] def generateRandomArray(rand: Random, size: Int): util.ArrayList[Boolean] = {
+ val vec = new util.ArrayList[Boolean]()
+ for (i <- 0 until size) {
+ vec.add(rand.nextBoolean())
+ }
+ vec
+ }
+
+ /**
+ * This function generates a random ByteBuffer of a given size.
+ */
+ private[avro] def generateRandomByteBuffer(rand: Random, size: Int): ByteBuffer = {
+ val bb = ByteBuffer.allocate(size)
+ val arrayOfBytes = new Array[Byte](size)
+ rand.nextBytes(arrayOfBytes)
+ bb.put(arrayOfBytes)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6dee6fc..0392923 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,6 +104,7 @@
<module>external/kafka-0-10</module>
<module>external/kafka-0-10-assembly</module>
<module>external/kafka-0-10-sql</module>
+ <module>external/avro</module>
<!-- See additional modules enabled by profiles below -->
</modules>
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index f887e45..247b6fe 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -40,8 +40,8 @@ object BuildCommons {
private val buildLocation = file(".").getAbsoluteFile.getParentFile
- val sqlProjects@Seq(catalyst, sql, hive, hiveThriftServer, sqlKafka010) = Seq(
- "catalyst", "sql", "hive", "hive-thriftserver", "sql-kafka-0-10"
+ val sqlProjects@Seq(catalyst, sql, hive, hiveThriftServer, sqlKafka010, avro) = Seq(
+ "catalyst", "sql", "hive", "hive-thriftserver", "sql-kafka-0-10", "avro"
).map(ProjectRef(buildLocation, _))
val streamingProjects@Seq(streaming, streamingKafka010) =
@@ -326,7 +326,7 @@ object SparkBuild extends PomBuild {
val mimaProjects = allProjects.filterNot { x =>
Seq(
spark, hive, hiveThriftServer, catalyst, repl, networkCommon, networkShuffle, networkYarn,
- unsafe, tags, sqlKafka010, kvstore
+ unsafe, tags, sqlKafka010, kvstore, avro
).contains(x)
}
@@ -688,9 +688,11 @@ object Unidoc {
publish := {},
unidocProjectFilter in(ScalaUnidoc, unidoc) :=
- inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),
+ inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes,
+ yarn, tags, streamingKafka010, sqlKafka010, avro),
unidocProjectFilter in(JavaUnidoc, unidoc) :=
- inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),
+ inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes,
+ yarn, tags, streamingKafka010, sqlKafka010, avro),
unidocAllClasspaths in (ScalaUnidoc, unidoc) := {
ignoreClasspaths((unidocAllClasspaths in (ScalaUnidoc, unidoc)).value)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org
[2/2] spark git commit: [SPARK-24768][SQL] Have a built-in AVRO data
source implementation
Posted by li...@apache.org.
[SPARK-24768][SQL] Have a built-in AVRO data source implementation
## What changes were proposed in this pull request?
Apache Avro (https://avro.apache.org) is a popular data serialization format. It is widely used in the Spark and Hadoop ecosystem, especially for Kafka-based data pipelines. Using the external package https://github.com/databricks/spark-avro, Spark SQL can read and write the avro data. Making spark-Avro built-in can provide a better experience for first-time users of Spark SQL and structured streaming. We expect the built-in Avro data source can further improve the adoption of structured streaming.
The proposal is to inline code from spark-avro package (https://github.com/databricks/spark-avro). The target release is Spark 2.4.
[Built-in AVRO Data Source In Spark 2.4.pdf](https://github.com/apache/spark/files/2181511/Built-in.AVRO.Data.Source.In.Spark.2.4.pdf)
## How was this patch tested?
Unit test
Author: Gengliang Wang <ge...@databricks.com>
Closes #21742 from gengliangwang/export_avro.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/395860a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/395860a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/395860a9
Branch: refs/heads/master
Commit: 395860a986987886df6d60fd9b26afd818b2cb39
Parents: 1055c94
Author: Gengliang Wang <ge...@databricks.com>
Authored: Thu Jul 12 13:55:25 2018 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Thu Jul 12 13:55:25 2018 -0700
----------------------------------------------------------------------
dev/run-tests.py | 2 +-
dev/sparktestsupport/modules.py | 10 +
external/avro/pom.xml | 73 ++
....apache.spark.sql.sources.DataSourceRegister | 1 +
.../apache/spark/sql/avro/AvroFileFormat.scala | 289 +++++++
.../spark/sql/avro/AvroOutputWriter.scala | 164 ++++
.../sql/avro/AvroOutputWriterFactory.scala | 38 +
.../spark/sql/avro/SchemaConverters.scala | 406 ++++++++++
.../org/apache/spark/sql/avro/package.scala | 39 +
external/avro/src/test/resources/episodes.avro | Bin 0 -> 597 bytes
.../avro/src/test/resources/log4j.properties | 49 ++
.../test-random-partitioned/part-r-00000.avro | Bin 0 -> 1768 bytes
.../test-random-partitioned/part-r-00001.avro | Bin 0 -> 2313 bytes
.../test-random-partitioned/part-r-00002.avro | Bin 0 -> 1621 bytes
.../test-random-partitioned/part-r-00003.avro | Bin 0 -> 2117 bytes
.../test-random-partitioned/part-r-00004.avro | Bin 0 -> 3282 bytes
.../test-random-partitioned/part-r-00005.avro | Bin 0 -> 1550 bytes
.../test-random-partitioned/part-r-00006.avro | Bin 0 -> 1729 bytes
.../test-random-partitioned/part-r-00007.avro | Bin 0 -> 1897 bytes
.../test-random-partitioned/part-r-00008.avro | Bin 0 -> 3420 bytes
.../test-random-partitioned/part-r-00009.avro | Bin 0 -> 1796 bytes
.../test-random-partitioned/part-r-00010.avro | Bin 0 -> 3872 bytes
external/avro/src/test/resources/test.avro | Bin 0 -> 1365 bytes
external/avro/src/test/resources/test.avsc | 53 ++
external/avro/src/test/resources/test.json | 42 +
.../org/apache/spark/sql/avro/AvroSuite.scala | 812 +++++++++++++++++++
.../avro/SerializableConfigurationSuite.scala | 50 ++
.../org/apache/spark/sql/avro/TestUtils.scala | 156 ++++
pom.xml | 1 +
project/SparkBuild.scala | 12 +-
30 files changed, 2191 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/dev/run-tests.py
----------------------------------------------------------------------
diff --git a/dev/run-tests.py b/dev/run-tests.py
index cd45908..d9d3789 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -110,7 +110,7 @@ def determine_modules_to_test(changed_modules):
['graphx', 'examples']
>>> x = [x.name for x in determine_modules_to_test([modules.sql])]
>>> x # doctest: +NORMALIZE_WHITESPACE
- ['sql', 'hive', 'mllib', 'sql-kafka-0-10', 'examples', 'hive-thriftserver',
+ ['sql', 'avro', 'hive', 'mllib', 'sql-kafka-0-10', 'examples', 'hive-thriftserver',
'pyspark-sql', 'repl', 'sparkr', 'pyspark-mllib', 'pyspark-ml']
"""
modules_to_test = set()
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/dev/sparktestsupport/modules.py
----------------------------------------------------------------------
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index dfea762..2aa3555 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -170,6 +170,16 @@ hive_thriftserver = Module(
]
)
+avro = Module(
+ name="avro",
+ dependencies=[sql],
+ source_file_regexes=[
+ "external/avro",
+ ],
+ sbt_test_goals=[
+ "avro/test",
+ ]
+)
sql_kafka = Module(
name="sql-kafka-0-10",
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/pom.xml
----------------------------------------------------------------------
diff --git a/external/avro/pom.xml b/external/avro/pom.xml
new file mode 100644
index 0000000..42e865b
--- /dev/null
+++ b/external/avro/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.11</artifactId>
+ <version>2.4.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>spark-sql-avro_2.11</artifactId>
+ <properties>
+ <sbt.project.name>avro</sbt.project.name>
+ </properties>
+ <packaging>jar</packaging>
+ <name>Spark Avro</name>
+ <url>http://spark.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-tags_${scala.binary.version}</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/external/avro/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/external/avro/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..95835f0
--- /dev/null
+++ b/external/avro/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1 @@
+org.apache.spark.sql.avro.AvroFileFormat
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
new file mode 100755
index 0000000..46e5a18
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import java.io._
+import java.net.URI
+import java.util.zip.Deflater
+
+import scala.util.control.NonFatal
+
+import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
+import com.esotericsoftware.kryo.io.{Input, Output}
+import org.apache.avro.{Schema, SchemaBuilder}
+import org.apache.avro.file.{DataFileConstants, DataFileReader}
+import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
+import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
+import org.apache.avro.mapreduce.AvroJob
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.Job
+import org.slf4j.LoggerFactory
+
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.types.StructType
+
+private[avro] class AvroFileFormat extends FileFormat with DataSourceRegister {
+ private val log = LoggerFactory.getLogger(getClass)
+
+ override def equals(other: Any): Boolean = other match {
+ case _: AvroFileFormat => true
+ case _ => false
+ }
+
+ // Dummy hashCode() to appease ScalaStyle.
+ override def hashCode(): Int = super.hashCode()
+
+ override def inferSchema(
+ spark: SparkSession,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = {
+ val conf = spark.sparkContext.hadoopConfiguration
+
+ // Schema evolution is not supported yet. Here we only pick a single random sample file to
+ // figure out the schema of the whole dataset.
+ val sampleFile =
+ if (conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true)) {
+ files.find(_.getPath.getName.endsWith(".avro")).getOrElse {
+ throw new FileNotFoundException(
+ "No Avro files found. Hadoop option \"avro.mapred.ignore.inputs.without.extension\" " +
+ " is set to true. Do all input files have \".avro\" extension?"
+ )
+ }
+ } else {
+ files.headOption.getOrElse {
+ throw new FileNotFoundException("No Avro files found.")
+ }
+ }
+
+ // User can specify an optional avro json schema.
+ val avroSchema = options.get(AvroFileFormat.AvroSchema)
+ .map(new Schema.Parser().parse)
+ .getOrElse {
+ val in = new FsInput(sampleFile.getPath, conf)
+ try {
+ val reader = DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]())
+ try {
+ reader.getSchema
+ } finally {
+ reader.close()
+ }
+ } finally {
+ in.close()
+ }
+ }
+
+ SchemaConverters.toSqlType(avroSchema).dataType match {
+ case t: StructType => Some(t)
+ case _ => throw new RuntimeException(
+ s"""Avro schema cannot be converted to a Spark SQL StructType:
+ |
+ |${avroSchema.toString(true)}
+ |""".stripMargin)
+ }
+ }
+
+ override def shortName(): String = "avro"
+
+ override def isSplitable(
+ sparkSession: SparkSession,
+ options: Map[String, String],
+ path: Path): Boolean = true
+
+ override def prepareWrite(
+ spark: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+ val recordName = options.getOrElse("recordName", "topLevelRecord")
+ val recordNamespace = options.getOrElse("recordNamespace", "")
+ val build = SchemaBuilder.record(recordName).namespace(recordNamespace)
+ val outputAvroSchema = SchemaConverters.convertStructToAvro(dataSchema, build, recordNamespace)
+
+ AvroJob.setOutputKeySchema(job, outputAvroSchema)
+ val AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec"
+ val AVRO_DEFLATE_LEVEL = "spark.sql.avro.deflate.level"
+ val COMPRESS_KEY = "mapred.output.compress"
+
+ spark.conf.get(AVRO_COMPRESSION_CODEC, "snappy") match {
+ case "uncompressed" =>
+ log.info("writing uncompressed Avro records")
+ job.getConfiguration.setBoolean(COMPRESS_KEY, false)
+
+ case "snappy" =>
+ log.info("compressing Avro output using Snappy")
+ job.getConfiguration.setBoolean(COMPRESS_KEY, true)
+ job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.SNAPPY_CODEC)
+
+ case "deflate" =>
+ val deflateLevel = spark.conf.get(
+ AVRO_DEFLATE_LEVEL, Deflater.DEFAULT_COMPRESSION.toString).toInt
+ log.info(s"compressing Avro output using deflate (level=$deflateLevel)")
+ job.getConfiguration.setBoolean(COMPRESS_KEY, true)
+ job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, DataFileConstants.DEFLATE_CODEC)
+ job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
+
+ case unknown: String =>
+ log.error(s"unsupported compression codec $unknown")
+ }
+
+ new AvroOutputWriterFactory(dataSchema, recordName, recordNamespace)
+ }
+
+ override def buildReader(
+ spark: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
+
+ val broadcastedConf =
+ spark.sparkContext.broadcast(new AvroFileFormat.SerializableConfiguration(hadoopConf))
+
+ (file: PartitionedFile) => {
+ val log = LoggerFactory.getLogger(classOf[AvroFileFormat])
+ val conf = broadcastedConf.value.value
+ val userProvidedSchema = options.get(AvroFileFormat.AvroSchema).map(new Schema.Parser().parse)
+
+ // TODO Removes this check once `FileFormat` gets a general file filtering interface method.
+ // Doing input file filtering is improper because we may generate empty tasks that process no
+ // input files but stress the scheduler. We should probably add a more general input file
+ // filtering mechanism for `FileFormat` data sources. See SPARK-16317.
+ if (
+ conf.getBoolean(AvroFileFormat.IgnoreFilesWithoutExtensionProperty, true) &&
+ !file.filePath.endsWith(".avro")
+ ) {
+ Iterator.empty
+ } else {
+ val reader = {
+ val in = new FsInput(new Path(new URI(file.filePath)), conf)
+ try {
+ val datumReader = userProvidedSchema match {
+ case Some(userSchema) => new GenericDatumReader[GenericRecord](userSchema)
+ case _ => new GenericDatumReader[GenericRecord]()
+ }
+ DataFileReader.openReader(in, datumReader)
+ } catch {
+ case NonFatal(e) =>
+ log.error("Exception while opening DataFileReader", e)
+ in.close()
+ throw e
+ }
+ }
+
+ // Ensure that the reader is closed even if the task fails or doesn't consume the entire
+ // iterator of records.
+ Option(TaskContext.get()).foreach { taskContext =>
+ taskContext.addTaskCompletionListener { _ =>
+ reader.close()
+ }
+ }
+
+ reader.sync(file.start)
+ val stop = file.start + file.length
+
+ val rowConverter = SchemaConverters.createConverterToSQL(
+ userProvidedSchema.getOrElse(reader.getSchema), requiredSchema)
+
+ new Iterator[InternalRow] {
+ // Used to convert `Row`s containing data columns into `InternalRow`s.
+ private val encoderForDataColumns = RowEncoder(requiredSchema)
+
+ private[this] var completed = false
+
+ override def hasNext: Boolean = {
+ if (completed) {
+ false
+ } else {
+ val r = reader.hasNext && !reader.pastSync(stop)
+ if (!r) {
+ reader.close()
+ completed = true
+ }
+ r
+ }
+ }
+
+ override def next(): InternalRow = {
+ if (reader.pastSync(stop)) {
+ throw new NoSuchElementException("next on empty iterator")
+ }
+ val record = reader.next()
+ val safeDataRow = rowConverter(record).asInstanceOf[GenericRow]
+
+ // The safeDataRow is reused, we must do a copy
+ encoderForDataColumns.toRow(safeDataRow)
+ }
+ }
+ }
+ }
+ }
+}
+
+private[avro] object AvroFileFormat {
+ val IgnoreFilesWithoutExtensionProperty = "avro.mapred.ignore.inputs.without.extension"
+
+ val AvroSchema = "avroSchema"
+
+ class SerializableConfiguration(@transient var value: Configuration)
+ extends Serializable with KryoSerializable {
+ @transient private[avro] lazy val log = LoggerFactory.getLogger(getClass)
+
+ private def writeObject(out: ObjectOutputStream): Unit = tryOrIOException {
+ out.defaultWriteObject()
+ value.write(out)
+ }
+
+ private def readObject(in: ObjectInputStream): Unit = tryOrIOException {
+ value = new Configuration(false)
+ value.readFields(in)
+ }
+
+ private def tryOrIOException[T](block: => T): T = {
+ try {
+ block
+ } catch {
+ case e: IOException =>
+ log.error("Exception encountered", e)
+ throw e
+ case NonFatal(e) =>
+ log.error("Exception encountered", e)
+ throw new IOException(e)
+ }
+ }
+
+ def write(kryo: Kryo, out: Output): Unit = {
+ val dos = new DataOutputStream(out)
+ value.write(dos)
+ dos.flush()
+ }
+
+ def read(kryo: Kryo, in: Input): Unit = {
+ value = new Configuration(false)
+ value.readFields(new DataInputStream(in))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
new file mode 100644
index 0000000..830bf3c
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import java.io.{IOException, OutputStream}
+import java.nio.ByteBuffer
+import java.sql.{Date, Timestamp}
+import java.util.HashMap
+
+import scala.collection.immutable.Map
+
+import org.apache.avro.{Schema, SchemaBuilder}
+import org.apache.avro.generic.GenericData.Record
+import org.apache.avro.generic.GenericRecord
+import org.apache.avro.mapred.AvroKey
+import org.apache.avro.mapreduce.AvroKeyOutputFormat
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.execution.datasources.OutputWriter
+import org.apache.spark.sql.types._
+
+// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
+private[avro] class AvroOutputWriter(
+ path: String,
+ context: TaskAttemptContext,
+ schema: StructType,
+ recordName: String,
+ recordNamespace: String) extends OutputWriter {
+
+ private lazy val converter = createConverterToAvro(schema, recordName, recordNamespace)
+ // copy of the old conversion logic after api change in SPARK-19085
+ private lazy val internalRowConverter =
+ CatalystTypeConverters.createToScalaConverter(schema).asInstanceOf[InternalRow => Row]
+
+ /**
+ * Overrides the couple of methods responsible for generating the output streams / files so
+ * that the data can be correctly partitioned
+ */
+ private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] =
+ new AvroKeyOutputFormat[GenericRecord]() {
+
+ override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
+ new Path(path)
+ }
+
+ @throws(classOf[IOException])
+ override def getAvroFileOutputStream(c: TaskAttemptContext): OutputStream = {
+ val path = getDefaultWorkFile(context, ".avro")
+ path.getFileSystem(context.getConfiguration).create(path)
+ }
+
+ }.getRecordWriter(context)
+
+ override def write(internalRow: InternalRow): Unit = {
+ val row = internalRowConverter(internalRow)
+ val key = new AvroKey(converter(row).asInstanceOf[GenericRecord])
+ recordWriter.write(key, NullWritable.get())
+ }
+
+ override def close(): Unit = recordWriter.close(context)
+
+ /**
+ * This function constructs converter function for a given sparkSQL datatype. This is used in
+ * writing Avro records out to disk
+ */
+ private def createConverterToAvro(
+ dataType: DataType,
+ structName: String,
+ recordNamespace: String): (Any) => Any = {
+ dataType match {
+ case BinaryType => (item: Any) => item match {
+ case null => null
+ case bytes: Array[Byte] => ByteBuffer.wrap(bytes)
+ }
+ case ByteType | ShortType | IntegerType | LongType |
+ FloatType | DoubleType | StringType | BooleanType => identity
+ case _: DecimalType => (item: Any) => if (item == null) null else item.toString
+ case TimestampType => (item: Any) =>
+ if (item == null) null else item.asInstanceOf[Timestamp].getTime
+ case DateType => (item: Any) =>
+ if (item == null) null else item.asInstanceOf[Date].getTime
+ case ArrayType(elementType, _) =>
+ val elementConverter = createConverterToAvro(
+ elementType,
+ structName,
+ SchemaConverters.getNewRecordNamespace(elementType, recordNamespace, structName))
+ (item: Any) => {
+ if (item == null) {
+ null
+ } else {
+ val sourceArray = item.asInstanceOf[Seq[Any]]
+ val sourceArraySize = sourceArray.size
+ val targetArray = new Array[Any](sourceArraySize)
+ var idx = 0
+ while (idx < sourceArraySize) {
+ targetArray(idx) = elementConverter(sourceArray(idx))
+ idx += 1
+ }
+ targetArray
+ }
+ }
+ case MapType(StringType, valueType, _) =>
+ val valueConverter = createConverterToAvro(
+ valueType,
+ structName,
+ SchemaConverters.getNewRecordNamespace(valueType, recordNamespace, structName))
+ (item: Any) => {
+ if (item == null) {
+ null
+ } else {
+ val javaMap = new HashMap[String, Any]()
+ item.asInstanceOf[Map[String, Any]].foreach { case (key, value) =>
+ javaMap.put(key, valueConverter(value))
+ }
+ javaMap
+ }
+ }
+ case structType: StructType =>
+ val builder = SchemaBuilder.record(structName).namespace(recordNamespace)
+ val schema: Schema = SchemaConverters.convertStructToAvro(
+ structType, builder, recordNamespace)
+ val fieldConverters = structType.fields.map(field =>
+ createConverterToAvro(
+ field.dataType,
+ field.name,
+ SchemaConverters.getNewRecordNamespace(field.dataType, recordNamespace, field.name)))
+ (item: Any) => {
+ if (item == null) {
+ null
+ } else {
+ val record = new Record(schema)
+ val convertersIterator = fieldConverters.iterator
+ val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator
+ val rowIterator = item.asInstanceOf[Row].toSeq.iterator
+
+ while (convertersIterator.hasNext) {
+ val converter = convertersIterator.next()
+ record.put(fieldNamesIterator.next(), converter(rowIterator.next()))
+ }
+ record
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
new file mode 100644
index 0000000..5b2ce7d
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.types.StructType
+
+private[avro] class AvroOutputWriterFactory(
+ schema: StructType,
+ recordName: String,
+ recordNamespace: String) extends OutputWriterFactory {
+
+ override def getFileExtension(context: TaskAttemptContext): String = ".avro"
+
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ new AvroOutputWriter(path, context, schema, recordName, recordNamespace)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
new file mode 100644
index 0000000..01f8c74
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro
+
+import java.nio.ByteBuffer
+import java.sql.{Date, Timestamp}
+
+import scala.collection.JavaConverters._
+
+import org.apache.avro.{Schema, SchemaBuilder}
+import org.apache.avro.Schema.Type._
+import org.apache.avro.SchemaBuilder._
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.generic.GenericFixed
+
+import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.types._
+
+/**
+ * This object contains method that are used to convert sparkSQL schemas to avro schemas and vice
+ * versa.
+ */
+object SchemaConverters {
+
+ class IncompatibleSchemaException(msg: String, ex: Throwable = null) extends Exception(msg, ex)
+
+ case class SchemaType(dataType: DataType, nullable: Boolean)
+
+ /**
+ * This function takes an avro schema and returns a sql schema.
+ */
+ def toSqlType(avroSchema: Schema): SchemaType = {
+ avroSchema.getType match {
+ case INT => SchemaType(IntegerType, nullable = false)
+ case STRING => SchemaType(StringType, nullable = false)
+ case BOOLEAN => SchemaType(BooleanType, nullable = false)
+ case BYTES => SchemaType(BinaryType, nullable = false)
+ case DOUBLE => SchemaType(DoubleType, nullable = false)
+ case FLOAT => SchemaType(FloatType, nullable = false)
+ case LONG => SchemaType(LongType, nullable = false)
+ case FIXED => SchemaType(BinaryType, nullable = false)
+ case ENUM => SchemaType(StringType, nullable = false)
+
+ case RECORD =>
+ val fields = avroSchema.getFields.asScala.map { f =>
+ val schemaType = toSqlType(f.schema())
+ StructField(f.name, schemaType.dataType, schemaType.nullable)
+ }
+
+ SchemaType(StructType(fields), nullable = false)
+
+ case ARRAY =>
+ val schemaType = toSqlType(avroSchema.getElementType)
+ SchemaType(
+ ArrayType(schemaType.dataType, containsNull = schemaType.nullable),
+ nullable = false)
+
+ case MAP =>
+ val schemaType = toSqlType(avroSchema.getValueType)
+ SchemaType(
+ MapType(StringType, schemaType.dataType, valueContainsNull = schemaType.nullable),
+ nullable = false)
+
+ case UNION =>
+ if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
+ // In case of a union with null, eliminate it and make a recursive call
+ val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL)
+ if (remainingUnionTypes.size == 1) {
+ toSqlType(remainingUnionTypes.head).copy(nullable = true)
+ } else {
+ toSqlType(Schema.createUnion(remainingUnionTypes.asJava)).copy(nullable = true)
+ }
+ } else avroSchema.getTypes.asScala.map(_.getType) match {
+ case Seq(t1) =>
+ toSqlType(avroSchema.getTypes.get(0))
+ case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
+ SchemaType(LongType, nullable = false)
+ case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
+ SchemaType(DoubleType, nullable = false)
+ case _ =>
+ // Convert complex unions to struct types where field names are member0, member1, etc.
+ // This is consistent with the behavior when converting between Avro and Parquet.
+ val fields = avroSchema.getTypes.asScala.zipWithIndex.map {
+ case (s, i) =>
+ val schemaType = toSqlType(s)
+ // All fields are nullable because only one of them is set at a time
+ StructField(s"member$i", schemaType.dataType, nullable = true)
+ }
+
+ SchemaType(StructType(fields), nullable = false)
+ }
+
+ case other => throw new IncompatibleSchemaException(s"Unsupported type $other")
+ }
+ }
+
+ /**
+ * This function converts sparkSQL StructType into avro schema. This method uses two other
+ * converter methods in order to do the conversion.
+ */
+ def convertStructToAvro[T](
+ structType: StructType,
+ schemaBuilder: RecordBuilder[T],
+ recordNamespace: String): T = {
+ val fieldsAssembler: FieldAssembler[T] = schemaBuilder.fields()
+ structType.fields.foreach { field =>
+ val newField = fieldsAssembler.name(field.name).`type`()
+
+ if (field.nullable) {
+ convertFieldTypeToAvro(field.dataType, newField.nullable(), field.name, recordNamespace)
+ .noDefault
+ } else {
+ convertFieldTypeToAvro(field.dataType, newField, field.name, recordNamespace)
+ .noDefault
+ }
+ }
+ fieldsAssembler.endRecord()
+ }
+
+ /**
+ * Returns a converter function to convert row in avro format to GenericRow of catalyst.
+ *
+ * @param sourceAvroSchema Source schema before conversion inferred from avro file by passed in
+ * by user.
+ * @param targetSqlType Target catalyst sql type after the conversion.
+ * @return returns a converter function to convert row in avro format to GenericRow of catalyst.
+ */
+ private[avro] def createConverterToSQL(
+ sourceAvroSchema: Schema,
+ targetSqlType: DataType): AnyRef => AnyRef = {
+
+ def createConverter(avroSchema: Schema,
+ sqlType: DataType, path: List[String]): AnyRef => AnyRef = {
+ val avroType = avroSchema.getType
+ (sqlType, avroType) match {
+ // Avro strings are in Utf8, so we have to call toString on them
+ case (StringType, STRING) | (StringType, ENUM) =>
+ (item: AnyRef) => item.toString
+ // Byte arrays are reused by avro, so we have to make a copy of them.
+ case (IntegerType, INT) | (BooleanType, BOOLEAN) | (DoubleType, DOUBLE) |
+ (FloatType, FLOAT) | (LongType, LONG) =>
+ identity
+ case (TimestampType, LONG) =>
+ (item: AnyRef) => new Timestamp(item.asInstanceOf[Long])
+ case (DateType, LONG) =>
+ (item: AnyRef) => new Date(item.asInstanceOf[Long])
+ case (BinaryType, FIXED) =>
+ (item: AnyRef) => item.asInstanceOf[GenericFixed].bytes().clone()
+ case (BinaryType, BYTES) =>
+ (item: AnyRef) =>
+ val byteBuffer = item.asInstanceOf[ByteBuffer]
+ val bytes = new Array[Byte](byteBuffer.remaining)
+ byteBuffer.get(bytes)
+ bytes
+ case (struct: StructType, RECORD) =>
+ val length = struct.fields.length
+ val converters = new Array[AnyRef => AnyRef](length)
+ val avroFieldIndexes = new Array[Int](length)
+ var i = 0
+ while (i < length) {
+ val sqlField = struct.fields(i)
+ val avroField = avroSchema.getField(sqlField.name)
+ if (avroField != null) {
+ val converter = (item: AnyRef) => {
+ if (item == null) {
+ item
+ } else {
+ createConverter(avroField.schema, sqlField.dataType, path :+ sqlField.name)(item)
+ }
+ }
+ converters(i) = converter
+ avroFieldIndexes(i) = avroField.pos()
+ } else if (!sqlField.nullable) {
+ throw new IncompatibleSchemaException(
+ s"Cannot find non-nullable field ${sqlField.name} at path ${path.mkString(".")} " +
+ "in Avro schema\n" +
+ s"Source Avro schema: $sourceAvroSchema.\n" +
+ s"Target Catalyst type: $targetSqlType")
+ }
+ i += 1
+ }
+
+ (item: AnyRef) =>
+ val record = item.asInstanceOf[GenericRecord]
+ val result = new Array[Any](length)
+ var i = 0
+ while (i < converters.length) {
+ if (converters(i) != null) {
+ val converter = converters(i)
+ result(i) = converter(record.get(avroFieldIndexes(i)))
+ }
+ i += 1
+ }
+ new GenericRow(result)
+ case (arrayType: ArrayType, ARRAY) =>
+ val elementConverter = createConverter(avroSchema.getElementType, arrayType.elementType,
+ path)
+ val allowsNull = arrayType.containsNull
+ (item: AnyRef) =>
+ item.asInstanceOf[java.lang.Iterable[AnyRef]].asScala.map { element =>
+ if (element == null && !allowsNull) {
+ throw new RuntimeException(s"Array value at path ${path.mkString(".")} is not " +
+ "allowed to be null")
+ } else {
+ elementConverter(element)
+ }
+ }
+ case (mapType: MapType, MAP) if mapType.keyType == StringType =>
+ val valueConverter = createConverter(avroSchema.getValueType, mapType.valueType, path)
+ val allowsNull = mapType.valueContainsNull
+ (item: AnyRef) =>
+ item.asInstanceOf[java.util.Map[AnyRef, AnyRef]].asScala.map { case (k, v) =>
+ if (v == null && !allowsNull) {
+ throw new RuntimeException(s"Map value at path ${path.mkString(".")} is not " +
+ "allowed to be null")
+ } else {
+ (k.toString, valueConverter(v))
+ }
+ }.toMap
+ case (sqlType, UNION) =>
+ if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
+ val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL)
+ if (remainingUnionTypes.size == 1) {
+ createConverter(remainingUnionTypes.head, sqlType, path)
+ } else {
+ createConverter(Schema.createUnion(remainingUnionTypes.asJava), sqlType, path)
+ }
+ } else avroSchema.getTypes.asScala.map(_.getType) match {
+ case Seq(t1) => createConverter(avroSchema.getTypes.get(0), sqlType, path)
+ case Seq(a, b) if Set(a, b) == Set(INT, LONG) && sqlType == LongType =>
+ (item: AnyRef) =>
+ item match {
+ case l: java.lang.Long => l
+ case i: java.lang.Integer => new java.lang.Long(i.longValue())
+ }
+ case Seq(a, b) if Set(a, b) == Set(FLOAT, DOUBLE) && sqlType == DoubleType =>
+ (item: AnyRef) =>
+ item match {
+ case d: java.lang.Double => d
+ case f: java.lang.Float => new java.lang.Double(f.doubleValue())
+ }
+ case other =>
+ sqlType match {
+ case t: StructType if t.fields.length == avroSchema.getTypes.size =>
+ val fieldConverters = t.fields.zip(avroSchema.getTypes.asScala).map {
+ case (field, schema) =>
+ createConverter(schema, field.dataType, path :+ field.name)
+ }
+ (item: AnyRef) =>
+ val i = GenericData.get().resolveUnion(avroSchema, item)
+ val converted = new Array[Any](fieldConverters.length)
+ converted(i) = fieldConverters(i)(item)
+ new GenericRow(converted)
+ case _ => throw new IncompatibleSchemaException(
+ s"Cannot convert Avro schema to catalyst type because schema at path " +
+ s"${path.mkString(".")} is not compatible " +
+ s"(avroType = $other, sqlType = $sqlType). \n" +
+ s"Source Avro schema: $sourceAvroSchema.\n" +
+ s"Target Catalyst type: $targetSqlType")
+ }
+ }
+ case (left, right) =>
+ throw new IncompatibleSchemaException(
+ s"Cannot convert Avro schema to catalyst type because schema at path " +
+ s"${path.mkString(".")} is not compatible (avroType = $right, sqlType = $left). \n" +
+ s"Source Avro schema: $sourceAvroSchema.\n" +
+ s"Target Catalyst type: $targetSqlType")
+ }
+ }
+ createConverter(sourceAvroSchema, targetSqlType, List.empty[String])
+ }
+
+ /**
+ * This function is used to convert some sparkSQL type to avro type. Note that this function won't
+ * be used to construct fields of avro record (convertFieldTypeToAvro is used for that).
+ */
+ private def convertTypeToAvro[T](
+ dataType: DataType,
+ schemaBuilder: BaseTypeBuilder[T],
+ structName: String,
+ recordNamespace: String): T = {
+ dataType match {
+ case ByteType => schemaBuilder.intType()
+ case ShortType => schemaBuilder.intType()
+ case IntegerType => schemaBuilder.intType()
+ case LongType => schemaBuilder.longType()
+ case FloatType => schemaBuilder.floatType()
+ case DoubleType => schemaBuilder.doubleType()
+ case _: DecimalType => schemaBuilder.stringType()
+ case StringType => schemaBuilder.stringType()
+ case BinaryType => schemaBuilder.bytesType()
+ case BooleanType => schemaBuilder.booleanType()
+ case TimestampType => schemaBuilder.longType()
+ case DateType => schemaBuilder.longType()
+
+ case ArrayType(elementType, _) =>
+ val builder = getSchemaBuilder(dataType.asInstanceOf[ArrayType].containsNull)
+ val elementSchema = convertTypeToAvro(elementType, builder, structName, recordNamespace)
+ schemaBuilder.array().items(elementSchema)
+
+ case MapType(StringType, valueType, _) =>
+ val builder = getSchemaBuilder(dataType.asInstanceOf[MapType].valueContainsNull)
+ val valueSchema = convertTypeToAvro(valueType, builder, structName, recordNamespace)
+ schemaBuilder.map().values(valueSchema)
+
+ case structType: StructType =>
+ convertStructToAvro(
+ structType,
+ schemaBuilder.record(structName).namespace(recordNamespace),
+ recordNamespace)
+
+ case other => throw new IncompatibleSchemaException(s"Unexpected type $dataType.")
+ }
+ }
+
+ /**
+ * This function is used to construct fields of the avro record, where schema of the field is
+ * specified by avro representation of dataType. Since builders for record fields are different
+ * from those for everything else, we have to use a separate method.
+ */
+ private def convertFieldTypeToAvro[T](
+ dataType: DataType,
+ newFieldBuilder: BaseFieldTypeBuilder[T],
+ structName: String,
+ recordNamespace: String): FieldDefault[T, _] = {
+ dataType match {
+ case ByteType => newFieldBuilder.intType()
+ case ShortType => newFieldBuilder.intType()
+ case IntegerType => newFieldBuilder.intType()
+ case LongType => newFieldBuilder.longType()
+ case FloatType => newFieldBuilder.floatType()
+ case DoubleType => newFieldBuilder.doubleType()
+ case _: DecimalType => newFieldBuilder.stringType()
+ case StringType => newFieldBuilder.stringType()
+ case BinaryType => newFieldBuilder.bytesType()
+ case BooleanType => newFieldBuilder.booleanType()
+ case TimestampType => newFieldBuilder.longType()
+ case DateType => newFieldBuilder.longType()
+
+ case ArrayType(elementType, _) =>
+ val builder = getSchemaBuilder(dataType.asInstanceOf[ArrayType].containsNull)
+ val elementSchema = convertTypeToAvro(
+ elementType,
+ builder,
+ structName,
+ getNewRecordNamespace(elementType, recordNamespace, structName))
+ newFieldBuilder.array().items(elementSchema)
+
+ case MapType(StringType, valueType, _) =>
+ val builder = getSchemaBuilder(dataType.asInstanceOf[MapType].valueContainsNull)
+ val valueSchema = convertTypeToAvro(
+ valueType,
+ builder,
+ structName,
+ getNewRecordNamespace(valueType, recordNamespace, structName))
+ newFieldBuilder.map().values(valueSchema)
+
+ case structType: StructType =>
+ convertStructToAvro(
+ structType,
+ newFieldBuilder.record(structName).namespace(s"$recordNamespace.$structName"),
+ s"$recordNamespace.$structName")
+
+ case other => throw new IncompatibleSchemaException(s"Unexpected type $dataType.")
+ }
+ }
+
+ /**
+ * Returns a new namespace depending on the data type of the element.
+ * If the data type is a StructType it returns the current namespace concatenated
+ * with the element name, otherwise it returns the current namespace as it is.
+ */
+ private[avro] def getNewRecordNamespace(
+ elementDataType: DataType,
+ currentRecordNamespace: String,
+ elementName: String): String = {
+
+ elementDataType match {
+ case StructType(_) => s"$currentRecordNamespace.$elementName"
+ case _ => currentRecordNamespace
+ }
+ }
+
+ private def getSchemaBuilder(isNullable: Boolean): BaseTypeBuilder[Schema] = {
+ if (isNullable) {
+ SchemaBuilder.builder().nullable()
+ } else {
+ SchemaBuilder.builder()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
----------------------------------------------------------------------
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
new file mode 100755
index 0000000..b3c8a66
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+package object avro {
+ /**
+ * Adds a method, `avro`, to DataFrameWriter that allows you to write avro files using
+ * the DataFileWriter
+ */
+ implicit class AvroDataFrameWriter[T](writer: DataFrameWriter[T]) {
+ def avro: String => Unit = writer.format("avro").save
+ }
+
+ /**
+ * Adds a method, `avro`, to DataFrameReader that allows you to read avro files using
+ * the DataFileReader
+ */
+ implicit class AvroDataFrameReader(reader: DataFrameReader) {
+ def avro: String => DataFrame = reader.format("avro").load
+
+ @scala.annotation.varargs
+ def avro(sources: String*): DataFrame = reader.format("avro").load(sources: _*)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/episodes.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/episodes.avro b/external/avro/src/test/resources/episodes.avro
new file mode 100644
index 0000000..58a028c
Binary files /dev/null and b/external/avro/src/test/resources/episodes.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/log4j.properties b/external/avro/src/test/resources/log4j.properties
new file mode 100644
index 0000000..c18a724
--- /dev/null
+++ b/external/avro/src/test/resources/log4j.properties
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file core/target/unit-tests.log
+log4j.rootLogger=DEBUG, CA, FA
+
+#Console Appender
+log4j.appender.CA=org.apache.log4j.ConsoleAppender
+log4j.appender.CA.layout=org.apache.log4j.PatternLayout
+log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
+log4j.appender.CA.Threshold = WARN
+
+
+#File Appender
+log4j.appender.FA=org.apache.log4j.FileAppender
+log4j.appender.FA.append=false
+log4j.appender.FA.file=target/unit-tests.log
+log4j.appender.FA.layout=org.apache.log4j.PatternLayout
+log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Set the logger level of File Appender to WARN
+log4j.appender.FA.Threshold = INFO
+
+# Some packages are noisy for no good reason.
+log4j.additivity.parquet.hadoop.ParquetRecordReader=false
+log4j.logger.parquet.hadoop.ParquetRecordReader=OFF
+
+log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false
+log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF
+
+log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false
+log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF
+
+log4j.additivity.hive.ql.metadata.Hive=false
+log4j.logger.hive.ql.metadata.Hive=OFF
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00000.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00000.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00000.avro
new file mode 100755
index 0000000..fece892
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00000.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00001.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00001.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00001.avro
new file mode 100755
index 0000000..1ca623a
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00001.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00002.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00002.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00002.avro
new file mode 100755
index 0000000..a12e945
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00002.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00003.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00003.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00003.avro
new file mode 100755
index 0000000..60c0956
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00003.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00004.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00004.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00004.avro
new file mode 100755
index 0000000..af56dfc
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00004.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00005.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00005.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00005.avro
new file mode 100755
index 0000000..87d7844
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00005.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00006.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00006.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00006.avro
new file mode 100755
index 0000000..c326fc4
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00006.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00007.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00007.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00007.avro
new file mode 100755
index 0000000..279f36c
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00007.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00008.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00008.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00008.avro
new file mode 100755
index 0000000..8d70f5d
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00008.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00009.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00009.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00009.avro
new file mode 100755
index 0000000..6839d72
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00009.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test-random-partitioned/part-r-00010.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test-random-partitioned/part-r-00010.avro b/external/avro/src/test/resources/test-random-partitioned/part-r-00010.avro
new file mode 100755
index 0000000..aedc7f7
Binary files /dev/null and b/external/avro/src/test/resources/test-random-partitioned/part-r-00010.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test.avro b/external/avro/src/test/resources/test.avro
new file mode 100644
index 0000000..6425e21
Binary files /dev/null and b/external/avro/src/test/resources/test.avro differ
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test.avsc
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test.avsc b/external/avro/src/test/resources/test.avsc
new file mode 100644
index 0000000..d7119a0
--- /dev/null
+++ b/external/avro/src/test/resources/test.avsc
@@ -0,0 +1,53 @@
+{
+ "type" : "record",
+ "name" : "test_schema",
+ "fields" : [{
+ "name" : "string",
+ "type" : "string",
+ "doc" : "Meaningless string of characters"
+ }, {
+ "name" : "simple_map",
+ "type" : {"type": "map", "values": "int"}
+ }, {
+ "name" : "complex_map",
+ "type" : {"type": "map", "values": {"type": "map", "values": "string"}}
+ }, {
+ "name" : "union_string_null",
+ "type" : ["null", "string"]
+ }, {
+ "name" : "union_int_long_null",
+ "type" : ["int", "long", "null"]
+ }, {
+ "name" : "union_float_double",
+ "type" : ["float", "double"]
+ }, {
+ "name": "fixed3",
+ "type": {"type": "fixed", "size": 3, "name": "fixed3"}
+ }, {
+ "name": "fixed2",
+ "type": {"type": "fixed", "size": 2, "name": "fixed2"}
+ }, {
+ "name": "enum",
+ "type": { "type": "enum",
+ "name": "Suit",
+ "symbols" : ["SPADES", "HEARTS", "DIAMONDS", "CLUBS"]
+ }
+ }, {
+ "name": "record",
+ "type": {
+ "type": "record",
+ "name": "record",
+ "aliases": ["RecordAlias"],
+ "fields" : [{
+ "name": "value_field",
+ "type": "string"
+ }]
+ }
+ }, {
+ "name": "array_of_boolean",
+ "type": {"type": "array", "items": "boolean"}
+ }, {
+ "name": "bytes",
+ "type": "bytes"
+ }]
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/395860a9/external/avro/src/test/resources/test.json
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/test.json b/external/avro/src/test/resources/test.json
new file mode 100644
index 0000000..780189a
--- /dev/null
+++ b/external/avro/src/test/resources/test.json
@@ -0,0 +1,42 @@
+{
+ "string": "OMG SPARK IS AWESOME",
+ "simple_map": {"abc": 1, "bcd": 7},
+ "complex_map": {"key": {"a": "b", "c": "d"}},
+ "union_string_null": {"string": "abc"},
+ "union_int_long_null": {"int": 1},
+ "union_float_double": {"float": 3.1415926535},
+ "fixed3":"\u0002\u0003\u0004",
+ "fixed2":"\u0011\u0012",
+ "enum": "SPADES",
+ "record": {"value_field": "Two things are infinite: the universe and human stupidity; and I'm not sure about universe."},
+ "array_of_boolean": [true, false, false],
+ "bytes": "\u0041\u0042\u0043"
+}
+{
+ "string": "Terran is IMBA!",
+ "simple_map": {"mmm": 0, "qqq": 66},
+ "complex_map": {"key": {"1": "2", "3": "4"}},
+ "union_string_null": {"string": "123"},
+ "union_int_long_null": {"long": 66},
+ "union_float_double": {"double": 6.6666666666666},
+ "fixed3":"\u0007\u0007\u0007",
+ "fixed2":"\u0001\u0002",
+ "enum": "CLUBS",
+ "record": {"value_field": "Life did not intend to make us perfect. Whoever is perfect belongs in a museum."},
+ "array_of_boolean": [],
+ "bytes": ""
+}
+{
+ "string": "The cake is a LIE!",
+ "simple_map": {},
+ "complex_map": {"key": {}},
+ "union_string_null": {"null": null},
+ "union_int_long_null": {"null": null},
+ "union_float_double": {"double": 0},
+ "fixed3":"\u0011\u0022\u0009",
+ "fixed2":"\u0010\u0090",
+ "enum": "DIAMONDS",
+ "record": {"value_field": "TEST_STR123"},
+ "array_of_boolean": [false],
+ "bytes": "\u0053"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org