You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/04/03 12:45:18 UTC
[spark] branch branch-3.0 updated: [SPARK-31327][SQL] Write Spark
version into Avro file metadata
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d36e2a7 [SPARK-31327][SQL] Write Spark version into Avro file metadata
d36e2a7 is described below
commit d36e2a721d0e3827477ca5a1aaf55700587380cf
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Fri Apr 3 12:43:33 2020 +0000
[SPARK-31327][SQL] Write Spark version into Avro file metadata
### What changes were proposed in this pull request?
Write Spark version into Avro file metadata
### Why are the changes needed?
The version info is very useful for backward compatibility. This is also done in parquet/orc.
### Does this PR introduce any user-facing change?
no
### How was this patch tested?
new test
Closes #28102 from cloud-fan/avro.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 6b1ca886c0066f4e10534336f3fce64cdebc79a5)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/avro/SparkAvroKeyOutputFormat.java | 94 ++++++++++++++++++++++
.../apache/spark/sql/avro/AvroOutputWriter.scala | 12 ++-
.../org/apache/spark/sql/avro/AvroSuite.scala | 14 +++-
.../main/scala/org/apache/spark/sql/package.scala | 1 +
4 files changed, 116 insertions(+), 5 deletions(-)
diff --git a/external/avro/src/main/java/org/apache/spark/sql/avro/SparkAvroKeyOutputFormat.java b/external/avro/src/main/java/org/apache/spark/sql/avro/SparkAvroKeyOutputFormat.java
new file mode 100644
index 0000000..55696a6
--- /dev/null
+++ b/external/avro/src/main/java/org/apache/spark/sql/avro/SparkAvroKeyOutputFormat.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.avro.mapreduce.Syncable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+// A variant of `AvroKeyOutputFormat`, which is used to inject the custom `RecordWriterFactory` so
+// that we can set avro file metadata.
+public class SparkAvroKeyOutputFormat extends AvroKeyOutputFormat<GenericRecord> {
+ public SparkAvroKeyOutputFormat(Map<String, String> metadata) {
+ super(new SparkRecordWriterFactory(metadata));
+ }
+
+ static class SparkRecordWriterFactory extends RecordWriterFactory<GenericRecord> {
+ private final Map<String, String> metadata;
+ SparkRecordWriterFactory(Map<String, String> metadata) {
+ this.metadata = metadata;
+ }
+
+ protected RecordWriter<AvroKey<GenericRecord>, NullWritable> create(
+ Schema writerSchema,
+ GenericData dataModel,
+ CodecFactory compressionCodec,
+ OutputStream outputStream,
+ int syncInterval) throws IOException {
+ return new SparkAvroKeyRecordWriter(
+ writerSchema, dataModel, compressionCodec, outputStream, syncInterval, metadata);
+ }
+ }
+}
+
+// This a fork of org.apache.avro.mapreduce.AvroKeyRecordWriter, in order to set file metadata.
+class SparkAvroKeyRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritable>
+ implements Syncable {
+
+ private final DataFileWriter<T> mAvroFileWriter;
+
+ SparkAvroKeyRecordWriter(
+ Schema writerSchema,
+ GenericData dataModel,
+ CodecFactory compressionCodec,
+ OutputStream outputStream,
+ int syncInterval,
+ Map<String, String> metadata) throws IOException {
+ this.mAvroFileWriter = new DataFileWriter(dataModel.createDatumWriter(writerSchema));
+ for (Map.Entry<String, String> entry : metadata.entrySet()) {
+ this.mAvroFileWriter.setMeta(entry.getKey(), entry.getValue());
+ }
+ this.mAvroFileWriter.setCodec(compressionCodec);
+ this.mAvroFileWriter.setSyncInterval(syncInterval);
+ this.mAvroFileWriter.create(writerSchema, outputStream);
+ }
+
+ public void write(AvroKey<T> record, NullWritable ignore) throws IOException {
+ this.mAvroFileWriter.append(record.datum());
+ }
+
+ public void close(TaskAttemptContext context) throws IOException {
+ this.mAvroFileWriter.close();
+ }
+
+ public long sync() throws IOException {
+ return this.mAvroFileWriter.sync();
+ }
+}
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
index 0650711..2cfa3a4 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
@@ -19,14 +19,17 @@ package org.apache.spark.sql.avro
import java.io.{IOException, OutputStream}
+import scala.collection.JavaConverters._
+
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
-import org.apache.avro.mapreduce.AvroKeyOutputFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
+import org.apache.spark.SPARK_VERSION_SHORT
+import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.types._
@@ -45,8 +48,9 @@ private[avro] class AvroOutputWriter(
* Overrides the couple of methods responsible for generating the output streams / files so
* that the data can be correctly partitioned
*/
- private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] =
- new AvroKeyOutputFormat[GenericRecord]() {
+ private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] = {
+ val sparkVersion = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT).asJava
+ new SparkAvroKeyOutputFormat(sparkVersion) {
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
new Path(path)
@@ -57,8 +61,8 @@ private[avro] class AvroOutputWriter(
val path = getDefaultWorkFile(context, ".avro")
path.getFileSystem(context.getConfiguration).create(path)
}
-
}.getRecordWriter(context)
+ }
override def write(row: InternalRow): Unit = {
val key = new AvroKey(serializer.serialize(row).asInstanceOf[GenericRecord])
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 9336d8e..a5224fd1 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -33,7 +33,7 @@ import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWri
import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
import org.apache.commons.io.FileUtils
-import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException}
import org.apache.spark.sql._
import org.apache.spark.sql.TestingUDT.IntervalData
import org.apache.spark.sql.catalyst.expressions.AttributeReference
@@ -1620,6 +1620,18 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
}
}
}
+
+ test("SPARK-31327: Write Spark version into Avro file metadata") {
+ withTempPath { path =>
+ spark.range(1).repartition(1).write.format("avro").save(path.getCanonicalPath)
+ val avroFiles = path.listFiles()
+ .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
+ assert(avroFiles.length === 1)
+ val reader = DataFileReader.openReader(avroFiles(0), new GenericDatumReader[GenericRecord]())
+ val version = reader.asInstanceOf[DataFileReader[_]].getMetaString(SPARK_VERSION_METADATA_KEY)
+ assert(version === SPARK_VERSION_SHORT)
+ }
+ }
}
class AvroV1Suite extends AvroSuite {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
index 6187593..58de675 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
@@ -49,6 +49,7 @@ package object sql {
* Metadata key which is used to write Spark version in the followings:
* - Parquet file metadata
* - ORC file metadata
+ * - Avro file metadata
*
* Note that Hive table property `spark.sql.create.version` also has Spark version.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org