You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/07/05 07:52:28 UTC
[spark] branch master updated: [SPARK-28218][SQL] Migrate Avro to
File Data Source V2
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3663dbe [SPARK-28218][SQL] Migrate Avro to File Data Source V2
3663dbe is described below
commit 3663dbe541826949cecf5e1ea205fe35c163d147
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Fri Jul 5 00:52:03 2019 -0700
[SPARK-28218][SQL] Migrate Avro to File Data Source V2
## What changes were proposed in this pull request?
Migrate Avro to File source V2.
## How was this patch tested?
Unit test
Closes #25017 from gengliangwang/avroV2.
Authored-by: Gengliang Wang <ge...@databricks.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
...org.apache.spark.sql.sources.DataSourceRegister | 2 +-
.../org/apache/spark/sql/avro/AvroFileFormat.scala | 123 +----------
.../spark/sql/avro/AvroOutputWriterFactory.scala | 2 +-
.../avro/{AvroFileFormat.scala => AvroUtils.scala} | 230 ++++++---------------
.../spark/sql/v2/avro/AvroDataSourceV2.scala | 43 ++++
.../sql/v2/avro/AvroPartitionReaderFactory.scala | 127 ++++++++++++
.../org/apache/spark/sql/v2/avro/AvroScan.scala | 52 +++++
.../apache/spark/sql/v2/avro/AvroScanBuilder.scala | 36 ++++
.../org/apache/spark/sql/v2/avro/AvroTable.scala | 51 +++++
.../spark/sql/v2/avro/AvroWriteBuilder.scala | 41 ++++
.../spark/sql/avro/AvroLogicalTypeSuite.scala | 20 +-
.../org/apache/spark/sql/avro/AvroSuite.scala | 25 ++-
12 files changed, 458 insertions(+), 294 deletions(-)
diff --git a/external/avro/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/external/avro/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 95835f0..d89f963 100644
--- a/external/avro/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/external/avro/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1 +1 @@
-org.apache.spark.sql.avro.AvroFileFormat
+org.apache.spark.sql.v2.avro.AvroDataSourceV2
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
index be8223c..123669b 100755
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
@@ -23,25 +23,23 @@ import java.net.URI
import scala.util.control.NonFatal
import org.apache.avro.Schema
-import org.apache.avro.file.DataFileConstants._
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
-import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
-import org.apache.avro.mapreduce.AvroJob
+import org.apache.avro.mapred.FsInput
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types._
-import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.SerializableConfiguration
-private[avro] class AvroFileFormat extends FileFormat
+private[sql] class AvroFileFormat extends FileFormat
with DataSourceRegister with Logging with Serializable {
override def equals(other: Any): Boolean = other match {
@@ -56,74 +54,7 @@ private[avro] class AvroFileFormat extends FileFormat
spark: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
- val conf = spark.sessionState.newHadoopConf()
- if (options.contains("ignoreExtension")) {
- logWarning(s"Option ${AvroOptions.ignoreExtensionKey} is deprecated. Please use the " +
- "general data source option pathGlobFilter for filtering file names.")
- }
- val parsedOptions = new AvroOptions(options, conf)
-
- // User can specify an optional avro json schema.
- val avroSchema = parsedOptions.schema
- .map(new Schema.Parser().parse)
- .getOrElse {
- inferAvroSchemaFromFiles(files, conf, parsedOptions.ignoreExtension,
- spark.sessionState.conf.ignoreCorruptFiles)
- }
-
- SchemaConverters.toSqlType(avroSchema).dataType match {
- case t: StructType => Some(t)
- case _ => throw new RuntimeException(
- s"""Avro schema cannot be converted to a Spark SQL StructType:
- |
- |${avroSchema.toString(true)}
- |""".stripMargin)
- }
- }
-
- private def inferAvroSchemaFromFiles(
- files: Seq[FileStatus],
- conf: Configuration,
- ignoreExtension: Boolean,
- ignoreCorruptFiles: Boolean): Schema = {
- // Schema evolution is not supported yet. Here we only pick first random readable sample file to
- // figure out the schema of the whole dataset.
- val avroReader = files.iterator.map { f =>
- val path = f.getPath
- if (!ignoreExtension && !path.getName.endsWith(".avro")) {
- None
- } else {
- Utils.tryWithResource {
- new FsInput(path, conf)
- } { in =>
- try {
- Some(DataFileReader.openReader(in, new GenericDatumReader[GenericRecord]()))
- } catch {
- case e: IOException =>
- if (ignoreCorruptFiles) {
- logWarning(s"Skipped the footer in the corrupted file: $path", e)
- None
- } else {
- throw new SparkException(s"Could not read file: $path", e)
- }
- }
- }
- }
- }.collectFirst {
- case Some(reader) => reader
- }
-
- avroReader match {
- case Some(reader) =>
- try {
- reader.getSchema
- } finally {
- reader.close()
- }
- case None =>
- throw new FileNotFoundException(
- "No Avro files found. If files don't have .avro extension, set ignoreExtension to true")
- }
+ AvroUtils.inferSchema(spark, options, files)
}
override def shortName(): String = "avro"
@@ -140,32 +71,7 @@ private[avro] class AvroFileFormat extends FileFormat
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
- val parsedOptions = new AvroOptions(options, spark.sessionState.newHadoopConf())
- val outputAvroSchema: Schema = parsedOptions.schema
- .map(new Schema.Parser().parse)
- .getOrElse(SchemaConverters.toAvroType(dataSchema, nullable = false,
- parsedOptions.recordName, parsedOptions.recordNamespace))
-
- AvroJob.setOutputKeySchema(job, outputAvroSchema)
-
- if (parsedOptions.compression == "uncompressed") {
- job.getConfiguration.setBoolean("mapred.output.compress", false)
- } else {
- job.getConfiguration.setBoolean("mapred.output.compress", true)
- logInfo(s"Compressing Avro output using the ${parsedOptions.compression} codec")
- val codec = parsedOptions.compression match {
- case DEFLATE_CODEC =>
- val deflateLevel = spark.sessionState.conf.avroDeflateLevel
- logInfo(s"Avro compression level $deflateLevel will be used for $DEFLATE_CODEC codec.")
- job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
- DEFLATE_CODEC
- case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC) => codec
- case unknown => throw new IllegalArgumentException(s"Invalid compression codec: $unknown")
- }
- job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, codec)
- }
-
- new AvroOutputWriterFactory(dataSchema, outputAvroSchema.toString)
+ AvroUtils.prepareWrite(spark.sessionState.conf, job, options, dataSchema)
}
override def buildReader(
@@ -250,22 +156,7 @@ private[avro] class AvroFileFormat extends FileFormat
}
}
- override def supportDataType(dataType: DataType): Boolean = dataType match {
- case _: AtomicType => true
-
- case st: StructType => st.forall { f => supportDataType(f.dataType) }
-
- case ArrayType(elementType, _) => supportDataType(elementType)
-
- case MapType(keyType, valueType, _) =>
- supportDataType(keyType) && supportDataType(valueType)
-
- case udt: UserDefinedType[_] => supportDataType(udt.sqlType)
-
- case _: NullType => true
-
- case _ => false
- }
+ override def supportDataType(dataType: DataType): Boolean = AvroUtils.supportsDataType(dataType)
}
private[avro] object AvroFileFormat {
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
index 116020e..0074044 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriterFactory.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType
* @param catalystSchema Catalyst schema of input data.
* @param avroSchemaAsJsonString Avro schema of output result, in JSON string format.
*/
-private[avro] class AvroOutputWriterFactory(
+private[sql] class AvroOutputWriterFactory(
catalystSchema: StructType,
avroSchemaAsJsonString: String) extends OutputWriterFactory {
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
old mode 100755
new mode 100644
similarity index 52%
copy from external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
copy to external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index be8223c..b978b79
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -14,45 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.spark.sql.avro
-import java.io._
-import java.net.URI
-
-import scala.util.control.NonFatal
+import java.io.{FileNotFoundException, IOException}
import org.apache.avro.Schema
-import org.apache.avro.file.DataFileConstants._
+import org.apache.avro.file.DataFileConstants.{BZIP2_CODEC, DEFLATE_CODEC, SNAPPY_CODEC, XZ_CODEC}
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
import org.apache.avro.mapreduce.AvroJob
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
-import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
-import org.apache.spark.util.{SerializableConfiguration, Utils}
-
-private[avro] class AvroFileFormat extends FileFormat
- with DataSourceRegister with Logging with Serializable {
-
- override def equals(other: Any): Boolean = other match {
- case _: AvroFileFormat => true
- case _ => false
- }
+import org.apache.spark.util.Utils
- // Dummy hashCode() to appease ScalaStyle.
- override def hashCode(): Int = super.hashCode()
-
- override def inferSchema(
+object AvroUtils extends Logging {
+ def inferSchema(
spark: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
@@ -69,7 +54,7 @@ private[avro] class AvroFileFormat extends FileFormat
.getOrElse {
inferAvroSchemaFromFiles(files, conf, parsedOptions.ignoreExtension,
spark.sessionState.conf.ignoreCorruptFiles)
- }
+ }
SchemaConverters.toSqlType(avroSchema).dataType match {
case t: StructType => Some(t)
@@ -81,6 +66,56 @@ private[avro] class AvroFileFormat extends FileFormat
}
}
+ def supportsDataType(dataType: DataType): Boolean = dataType match {
+ case _: AtomicType => true
+
+ case st: StructType => st.forall { f => supportsDataType(f.dataType) }
+
+ case ArrayType(elementType, _) => supportsDataType(elementType)
+
+ case MapType(keyType, valueType, _) =>
+ supportsDataType(keyType) && supportsDataType(valueType)
+
+ case udt: UserDefinedType[_] => supportsDataType(udt.sqlType)
+
+ case _: NullType => true
+
+ case _ => false
+ }
+
+ def prepareWrite(
+ sqlConf: SQLConf,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+ val parsedOptions = new AvroOptions(options, job.getConfiguration)
+ val outputAvroSchema: Schema = parsedOptions.schema
+ .map(new Schema.Parser().parse)
+ .getOrElse(SchemaConverters.toAvroType(dataSchema, nullable = false,
+ parsedOptions.recordName, parsedOptions.recordNamespace))
+
+ AvroJob.setOutputKeySchema(job, outputAvroSchema)
+
+ if (parsedOptions.compression == "uncompressed") {
+ job.getConfiguration.setBoolean("mapred.output.compress", false)
+ } else {
+ job.getConfiguration.setBoolean("mapred.output.compress", true)
+ logInfo(s"Compressing Avro output using the ${parsedOptions.compression} codec")
+ val codec = parsedOptions.compression match {
+ case DEFLATE_CODEC =>
+ val deflateLevel = sqlConf.avroDeflateLevel
+ logInfo(s"Avro compression level $deflateLevel will be used for $DEFLATE_CODEC codec.")
+ job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
+ DEFLATE_CODEC
+ case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC) => codec
+ case unknown => throw new IllegalArgumentException(s"Invalid compression codec: $unknown")
+ }
+ job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, codec)
+ }
+
+ new AvroOutputWriterFactory(dataSchema, outputAvroSchema.toString)
+ }
+
private def inferAvroSchemaFromFiles(
files: Seq[FileStatus],
conf: Configuration,
@@ -125,149 +160,4 @@ private[avro] class AvroFileFormat extends FileFormat
"No Avro files found. If files don't have .avro extension, set ignoreExtension to true")
}
}
-
- override def shortName(): String = "avro"
-
- override def toString(): String = "Avro"
-
- override def isSplitable(
- sparkSession: SparkSession,
- options: Map[String, String],
- path: Path): Boolean = true
-
- override def prepareWrite(
- spark: SparkSession,
- job: Job,
- options: Map[String, String],
- dataSchema: StructType): OutputWriterFactory = {
- val parsedOptions = new AvroOptions(options, spark.sessionState.newHadoopConf())
- val outputAvroSchema: Schema = parsedOptions.schema
- .map(new Schema.Parser().parse)
- .getOrElse(SchemaConverters.toAvroType(dataSchema, nullable = false,
- parsedOptions.recordName, parsedOptions.recordNamespace))
-
- AvroJob.setOutputKeySchema(job, outputAvroSchema)
-
- if (parsedOptions.compression == "uncompressed") {
- job.getConfiguration.setBoolean("mapred.output.compress", false)
- } else {
- job.getConfiguration.setBoolean("mapred.output.compress", true)
- logInfo(s"Compressing Avro output using the ${parsedOptions.compression} codec")
- val codec = parsedOptions.compression match {
- case DEFLATE_CODEC =>
- val deflateLevel = spark.sessionState.conf.avroDeflateLevel
- logInfo(s"Avro compression level $deflateLevel will be used for $DEFLATE_CODEC codec.")
- job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
- DEFLATE_CODEC
- case codec @ (SNAPPY_CODEC | BZIP2_CODEC | XZ_CODEC) => codec
- case unknown => throw new IllegalArgumentException(s"Invalid compression codec: $unknown")
- }
- job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, codec)
- }
-
- new AvroOutputWriterFactory(dataSchema, outputAvroSchema.toString)
- }
-
- override def buildReader(
- spark: SparkSession,
- dataSchema: StructType,
- partitionSchema: StructType,
- requiredSchema: StructType,
- filters: Seq[Filter],
- options: Map[String, String],
- hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
-
- val broadcastedConf =
- spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
- val parsedOptions = new AvroOptions(options, hadoopConf)
-
- (file: PartitionedFile) => {
- val conf = broadcastedConf.value.value
- val userProvidedSchema = parsedOptions.schema.map(new Schema.Parser().parse)
-
- // TODO Removes this check once `FileFormat` gets a general file filtering interface method.
- // Doing input file filtering is improper because we may generate empty tasks that process no
- // input files but stress the scheduler. We should probably add a more general input file
- // filtering mechanism for `FileFormat` data sources. See SPARK-16317.
- if (parsedOptions.ignoreExtension || file.filePath.endsWith(".avro")) {
- val reader = {
- val in = new FsInput(new Path(new URI(file.filePath)), conf)
- try {
- val datumReader = userProvidedSchema match {
- case Some(userSchema) => new GenericDatumReader[GenericRecord](userSchema)
- case _ => new GenericDatumReader[GenericRecord]()
- }
- DataFileReader.openReader(in, datumReader)
- } catch {
- case NonFatal(e) =>
- logError("Exception while opening DataFileReader", e)
- in.close()
- throw e
- }
- }
-
- // Ensure that the reader is closed even if the task fails or doesn't consume the entire
- // iterator of records.
- Option(TaskContext.get()).foreach { taskContext =>
- taskContext.addTaskCompletionListener[Unit] { _ =>
- reader.close()
- }
- }
-
- reader.sync(file.start)
- val stop = file.start + file.length
-
- val deserializer =
- new AvroDeserializer(userProvidedSchema.getOrElse(reader.getSchema), requiredSchema)
-
- new Iterator[InternalRow] {
- private[this] var completed = false
-
- override def hasNext: Boolean = {
- if (completed) {
- false
- } else {
- val r = reader.hasNext && !reader.pastSync(stop)
- if (!r) {
- reader.close()
- completed = true
- }
- r
- }
- }
-
- override def next(): InternalRow = {
- if (!hasNext) {
- throw new NoSuchElementException("next on empty iterator")
- }
- val record = reader.next()
- deserializer.deserialize(record).asInstanceOf[InternalRow]
- }
- }
- } else {
- Iterator.empty
- }
- }
- }
-
- override def supportDataType(dataType: DataType): Boolean = dataType match {
- case _: AtomicType => true
-
- case st: StructType => st.forall { f => supportDataType(f.dataType) }
-
- case ArrayType(elementType, _) => supportDataType(elementType)
-
- case MapType(keyType, valueType, _) =>
- supportDataType(keyType) && supportDataType(valueType)
-
- case udt: UserDefinedType[_] => supportDataType(udt.sqlType)
-
- case _: NullType => true
-
- case _ => false
- }
-}
-
-private[avro] object AvroFileFormat {
- val IgnoreFilesWithoutExtensionProperty = "avro.mapred.ignore.inputs.without.extension"
}
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala
new file mode 100644
index 0000000..3171f1e
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroDataSourceV2.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.v2.avro
+
+import org.apache.spark.sql.avro.AvroFileFormat
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
+import org.apache.spark.sql.sources.v2.Table
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class AvroDataSourceV2 extends FileDataSourceV2 {
+
+ override def fallbackFileFormat: Class[_ <: FileFormat] = classOf[AvroFileFormat]
+
+ override def shortName(): String = "avro"
+
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
+ val paths = getPaths(options)
+ val tableName = getTableName(paths)
+ AvroTable(tableName, sparkSession, options, paths, None, fallbackFileFormat)
+ }
+
+ override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = {
+ val paths = getPaths(options)
+ val tableName = getTableName(paths)
+ AvroTable(tableName, sparkSession, options, paths, Some(schema), fallbackFileFormat)
+ }
+}
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
new file mode 100644
index 0000000..243af7d
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroPartitionReaderFactory.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.v2.avro
+
+import java.net.URI
+
+import scala.util.control.NonFatal
+
+import org.apache.avro.Schema
+import org.apache.avro.file.DataFileReader
+import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
+import org.apache.avro.mapred.FsInput
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.TaskContext
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.avro.{AvroDeserializer, AvroOptions}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.v2.{EmptyPartitionReader, FilePartitionReaderFactory, PartitionReaderWithPartitionValues}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.v2.reader.PartitionReader
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * A factory used to create AVRO readers.
+ *
+ * @param sqlConf SQL configuration.
+ * @param broadcastedConf Broadcast serializable Hadoop Configuration.
+ * @param dataSchema Schema of AVRO files.
+ * @param readDataSchema Required data schema of AVRO files.
+ * @param partitionSchema Schema of partitions.
+ * @param options Options for parsing AVRO files.
+ */
+case class AvroPartitionReaderFactory(
+ sqlConf: SQLConf,
+ broadcastedConf: Broadcast[SerializableConfiguration],
+ dataSchema: StructType,
+ readDataSchema: StructType,
+ partitionSchema: StructType,
+ options: Map[String, String]) extends FilePartitionReaderFactory with Logging {
+
+ override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = {
+ val conf = broadcastedConf.value.value
+ val parsedOptions = new AvroOptions(options, conf)
+ val userProvidedSchema = parsedOptions.schema.map(new Schema.Parser().parse)
+
+ if (parsedOptions.ignoreExtension || partitionedFile.filePath.endsWith(".avro")) {
+ val reader = {
+ val in = new FsInput(new Path(new URI(partitionedFile.filePath)), conf)
+ try {
+ val datumReader = userProvidedSchema match {
+ case Some(userSchema) => new GenericDatumReader[GenericRecord](userSchema)
+ case _ => new GenericDatumReader[GenericRecord]()
+ }
+ DataFileReader.openReader(in, datumReader)
+ } catch {
+ case NonFatal(e) =>
+ logError("Exception while opening DataFileReader", e)
+ in.close()
+ throw e
+ }
+ }
+
+ // Ensure that the reader is closed even if the task fails or doesn't consume the entire
+ // iterator of records.
+ Option(TaskContext.get()).foreach { taskContext =>
+ taskContext.addTaskCompletionListener[Unit] { _ =>
+ reader.close()
+ }
+ }
+
+ reader.sync(partitionedFile.start)
+ val stop = partitionedFile.start + partitionedFile.length
+
+ val deserializer =
+ new AvroDeserializer(userProvidedSchema.getOrElse(reader.getSchema), readDataSchema)
+
+ val fileReader = new PartitionReader[InternalRow] {
+ private[this] var completed = false
+
+ override def next(): Boolean = {
+ if (completed) {
+ false
+ } else {
+ val r = reader.hasNext && !reader.pastSync(stop)
+ if (!r) {
+ reader.close()
+ completed = true
+ }
+ r
+ }
+ }
+
+ override def get(): InternalRow = {
+ if (!next) {
+ throw new NoSuchElementException("next on empty iterator")
+ }
+ val record = reader.next()
+ deserializer.deserialize(record).asInstanceOf[InternalRow]
+ }
+
+ override def close(): Unit = reader.close()
+ }
+ new PartitionReaderWithPartitionValues(fileReader, readDataSchema,
+ partitionSchema, partitionedFile.partitionValues)
+ } else {
+ new EmptyPartitionReader[InternalRow]
+ }
+ }
+}
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala
new file mode 100644
index 0000000..6ec3510
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScan.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.v2.avro
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
+import org.apache.spark.sql.execution.datasources.v2.FileScan
+import org.apache.spark.sql.sources.v2.reader.PartitionReaderFactory
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.util.SerializableConfiguration
+
+case class AvroScan(
+ sparkSession: SparkSession,
+ fileIndex: PartitioningAwareFileIndex,
+ dataSchema: StructType,
+ readDataSchema: StructType,
+ readPartitionSchema: StructType,
+ options: CaseInsensitiveStringMap)
+ extends FileScan(sparkSession, fileIndex, readDataSchema, readPartitionSchema) {
+ override def isSplitable(path: Path): Boolean = true
+
+ override def createReaderFactory(): PartitionReaderFactory = {
+ val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+ // Hadoop Configurations are case sensitive.
+ val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+ val broadcastedConf = sparkSession.sparkContext.broadcast(
+ new SerializableConfiguration(hadoopConf))
+ // The partition values are already truncated in `FileScan.partitions`.
+ // We should use `readPartitionSchema` as the partition schema here.
+ AvroPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf,
+ dataSchema, readDataSchema, readPartitionSchema, caseSensitiveMap)
+ }
+ }
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScanBuilder.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScanBuilder.scala
new file mode 100644
index 0000000..815da2b
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroScanBuilder.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.v2.avro
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
+import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
+import org.apache.spark.sql.sources.v2.reader.Scan
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class AvroScanBuilder (
+ sparkSession: SparkSession,
+ fileIndex: PartitioningAwareFileIndex,
+ schema: StructType,
+ dataSchema: StructType,
+ options: CaseInsensitiveStringMap)
+ extends FileScanBuilder(sparkSession, fileIndex, dataSchema) {
+ override def build(): Scan = {
+ AvroScan(sparkSession, fileIndex, dataSchema, readDataSchema(), readPartitionSchema(), options)
+ }
+}
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala
new file mode 100644
index 0000000..a781624
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.v2.avro
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.FileStatus
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.AvroUtils
+import org.apache.spark.sql.execution.datasources.FileFormat
+import org.apache.spark.sql.execution.datasources.v2.FileTable
+import org.apache.spark.sql.sources.v2.writer.WriteBuilder
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+case class AvroTable(
+ name: String,
+ sparkSession: SparkSession,
+ options: CaseInsensitiveStringMap,
+ paths: Seq[String],
+ userSpecifiedSchema: Option[StructType],
+ fallbackFileFormat: Class[_ <: FileFormat])
+ extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
+ override def newScanBuilder(options: CaseInsensitiveStringMap): AvroScanBuilder =
+ new AvroScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
+
+ override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
+ AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files)
+
+ override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder =
+ new AvroWriteBuilder(options, paths, formatName, supportsDataType)
+
+ override def supportsDataType(dataType: DataType): Boolean = AvroUtils.supportsDataType(dataType)
+
+ override def formatName: String = "AVRO"
+}
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWriteBuilder.scala b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWriteBuilder.scala
new file mode 100644
index 0000000..c2ddc4b
--- /dev/null
+++ b/external/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroWriteBuilder.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.v2.avro
+
+import org.apache.hadoop.mapreduce.Job
+
+import org.apache.spark.sql.avro.AvroUtils
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory
+import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class AvroWriteBuilder(
+ options: CaseInsensitiveStringMap,
+ paths: Seq[String],
+ formatName: String,
+ supportsDataType: DataType => Boolean)
+ extends FileWriteBuilder(options, paths, formatName, supportsDataType) {
+ override def prepareWrite(
+ sqlConf: SQLConf,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+ AvroUtils.prepareWrite(sqlConf, job, options, dataSchema)
+ }
+}
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
index 79ba287..9638276 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
@@ -24,14 +24,14 @@ import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types.{StructField, StructType, TimestampType}
-class AvroLogicalTypeSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
+abstract class AvroLogicalTypeSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
import testImplicits._
val dateSchema = s"""
@@ -349,3 +349,19 @@ class AvroLogicalTypeSuite extends QueryTest with SharedSQLContext with SQLTestU
}
}
}
+
+class AvroV1LogicalTypeSuite extends AvroLogicalTypeSuite {
+ override protected def sparkConf: SparkConf =
+ super
+ .sparkConf
+ .set(SQLConf.USE_V1_SOURCE_READER_LIST, "avro")
+ .set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "avro")
+}
+
+class AvroV2LogicalTypeSuite extends AvroLogicalTypeSuite {
+ override protected def sparkConf: SparkConf =
+ super
+ .sparkConf
+ .set(SQLConf.USE_V1_SOURCE_READER_LIST, "")
+ .set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "")
+}
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 49aa218..40bf3b1 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -33,7 +33,7 @@ import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWri
import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
import org.apache.commons.io.FileUtils
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql._
import org.apache.spark.sql.TestingUDT.{IntervalData, NullData, NullUDT}
import org.apache.spark.sql.execution.datasources.DataSource
@@ -42,7 +42,7 @@ import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
-class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
+abstract class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
import testImplicits._
val episodesAvro = testFile("episodes.avro")
@@ -81,7 +81,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
test("resolve avro data source") {
val databricksAvro = "com.databricks.spark.avro"
// By default the backward compatibility for com.databricks.spark.avro is enabled.
- Seq("avro", "org.apache.spark.sql.avro.AvroFileFormat", databricksAvro).foreach { provider =>
+ Seq("org.apache.spark.sql.avro.AvroFileFormat", databricksAvro).foreach { provider =>
assert(DataSource.lookupDataSource(provider, spark.sessionState.conf) ===
classOf[org.apache.spark.sql.avro.AvroFileFormat])
}
@@ -1000,7 +1000,8 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
var msg = intercept[AnalysisException] {
sql("select interval 1 days").write.format("avro").mode("overwrite").save(tempDir)
}.getMessage
- assert(msg.contains("Cannot save interval data type into external storage."))
+ assert(msg.contains("Cannot save interval data type into external storage.") ||
+ msg.contains("AVRO data source does not support calendarinterval data type."))
msg = intercept[AnalysisException] {
spark.udf.register("testType", () => new IntervalData())
@@ -1492,3 +1493,19 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
""".stripMargin)
}
}
+
+class AvroV1Suite extends AvroSuite {
+ override protected def sparkConf: SparkConf =
+ super
+ .sparkConf
+ .set(SQLConf.USE_V1_SOURCE_READER_LIST, "avro")
+ .set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "avro")
+}
+
+class AvroV2Suite extends AvroSuite {
+ override protected def sparkConf: SparkConf =
+ super
+ .sparkConf
+ .set(SQLConf.USE_V1_SOURCE_READER_LIST, "")
+ .set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "")
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org