You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/04/03 12:45:18 UTC

[spark] branch branch-3.0 updated: [SPARK-31327][SQL] Write Spark version into Avro file metadata

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d36e2a7  [SPARK-31327][SQL] Write Spark version into Avro file metadata
d36e2a7 is described below

commit d36e2a721d0e3827477ca5a1aaf55700587380cf
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Fri Apr 3 12:43:33 2020 +0000

    [SPARK-31327][SQL] Write Spark version into Avro file metadata
    
    ### What changes were proposed in this pull request?
    
    Write Spark version into Avro file metadata
    
    ### Why are the changes needed?
    
    The version info is very useful for backward compatibility. This is also done in parquet/orc.
    
    ### Does this PR introduce any user-facing change?
    
    no
    
    ### How was this patch tested?
    
    new test
    
    Closes #28102 from cloud-fan/avro.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 6b1ca886c0066f4e10534336f3fce64cdebc79a5)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/avro/SparkAvroKeyOutputFormat.java   | 94 ++++++++++++++++++++++
 .../apache/spark/sql/avro/AvroOutputWriter.scala   | 12 ++-
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 14 +++-
 .../main/scala/org/apache/spark/sql/package.scala  |  1 +
 4 files changed, 116 insertions(+), 5 deletions(-)

diff --git a/external/avro/src/main/java/org/apache/spark/sql/avro/SparkAvroKeyOutputFormat.java b/external/avro/src/main/java/org/apache/spark/sql/avro/SparkAvroKeyOutputFormat.java
new file mode 100644
index 0000000..55696a6
--- /dev/null
+++ b/external/avro/src/main/java/org/apache/spark/sql/avro/SparkAvroKeyOutputFormat.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.avro;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapreduce.AvroKeyOutputFormat;
+import org.apache.avro.mapreduce.Syncable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+// A variant of `AvroKeyOutputFormat`, which is used to inject the custom `RecordWriterFactory` so
+// that we can set avro file metadata.
+public class SparkAvroKeyOutputFormat extends AvroKeyOutputFormat<GenericRecord> {
+  public SparkAvroKeyOutputFormat(Map<String, String> metadata) {
+    super(new SparkRecordWriterFactory(metadata));
+  }
+
+  static class SparkRecordWriterFactory extends RecordWriterFactory<GenericRecord> {
+    private final Map<String, String> metadata;
+    SparkRecordWriterFactory(Map<String, String> metadata) {
+      this.metadata = metadata;
+    }
+
+    protected RecordWriter<AvroKey<GenericRecord>, NullWritable> create(
+        Schema writerSchema,
+        GenericData dataModel,
+        CodecFactory compressionCodec,
+        OutputStream outputStream,
+        int syncInterval) throws IOException {
+      return new SparkAvroKeyRecordWriter(
+        writerSchema, dataModel, compressionCodec, outputStream, syncInterval, metadata);
+    }
+  }
+}
+
+// This a fork of org.apache.avro.mapreduce.AvroKeyRecordWriter, in order to set file metadata.
+class SparkAvroKeyRecordWriter<T> extends RecordWriter<AvroKey<T>, NullWritable>
+    implements Syncable {
+
+  private final DataFileWriter<T> mAvroFileWriter;
+
+  SparkAvroKeyRecordWriter(
+      Schema writerSchema,
+      GenericData dataModel,
+      CodecFactory compressionCodec,
+      OutputStream outputStream,
+      int syncInterval,
+      Map<String, String> metadata) throws IOException {
+    this.mAvroFileWriter = new DataFileWriter(dataModel.createDatumWriter(writerSchema));
+    for (Map.Entry<String, String> entry : metadata.entrySet()) {
+      this.mAvroFileWriter.setMeta(entry.getKey(), entry.getValue());
+    }
+    this.mAvroFileWriter.setCodec(compressionCodec);
+    this.mAvroFileWriter.setSyncInterval(syncInterval);
+    this.mAvroFileWriter.create(writerSchema, outputStream);
+  }
+
+  public void write(AvroKey<T> record, NullWritable ignore) throws IOException {
+    this.mAvroFileWriter.append(record.datum());
+  }
+
+  public void close(TaskAttemptContext context) throws IOException {
+    this.mAvroFileWriter.close();
+  }
+
+  public long sync() throws IOException {
+    return this.mAvroFileWriter.sync();
+  }
+}
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
index 0650711..2cfa3a4 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOutputWriter.scala
@@ -19,14 +19,17 @@ package org.apache.spark.sql.avro
 
 import java.io.{IOException, OutputStream}
 
+import scala.collection.JavaConverters._
+
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
 import org.apache.avro.mapred.AvroKey
-import org.apache.avro.mapreduce.AvroKeyOutputFormat
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
 
+import org.apache.spark.SPARK_VERSION_SHORT
+import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.OutputWriter
 import org.apache.spark.sql.types._
@@ -45,8 +48,9 @@ private[avro] class AvroOutputWriter(
    * Overrides the couple of methods responsible for generating the output streams / files so
    * that the data can be correctly partitioned
    */
-  private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] =
-    new AvroKeyOutputFormat[GenericRecord]() {
+  private val recordWriter: RecordWriter[AvroKey[GenericRecord], NullWritable] = {
+    val sparkVersion = Map(SPARK_VERSION_METADATA_KEY -> SPARK_VERSION_SHORT).asJava
+    new SparkAvroKeyOutputFormat(sparkVersion) {
 
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
         new Path(path)
@@ -57,8 +61,8 @@ private[avro] class AvroOutputWriter(
         val path = getDefaultWorkFile(context, ".avro")
         path.getFileSystem(context.getConfiguration).create(path)
       }
-
     }.getRecordWriter(context)
+  }
 
   override def write(row: InternalRow): Unit = {
     val key = new AvroKey(serializer.serialize(row).asInstanceOf[GenericRecord])
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 9336d8e..a5224fd1 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -33,7 +33,7 @@ import org.apache.avro.generic.{GenericData, GenericDatumReader, GenericDatumWri
 import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
 import org.apache.commons.io.FileUtils
 
-import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf, SparkException}
 import org.apache.spark.sql._
 import org.apache.spark.sql.TestingUDT.IntervalData
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
@@ -1620,6 +1620,18 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
       }
     }
   }
+
+  test("SPARK-31327: Write Spark version into Avro file metadata") {
+    withTempPath { path =>
+      spark.range(1).repartition(1).write.format("avro").save(path.getCanonicalPath)
+      val avroFiles = path.listFiles()
+        .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
+      assert(avroFiles.length === 1)
+      val reader = DataFileReader.openReader(avroFiles(0), new GenericDatumReader[GenericRecord]())
+      val version = reader.asInstanceOf[DataFileReader[_]].getMetaString(SPARK_VERSION_METADATA_KEY)
+      assert(version === SPARK_VERSION_SHORT)
+    }
+  }
 }
 
 class AvroV1Suite extends AvroSuite {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
index 6187593..58de675 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
@@ -49,6 +49,7 @@ package object sql {
    * Metadata key which is used to write Spark version in the followings:
    * - Parquet file metadata
    * - ORC file metadata
+   * - Avro file metadata
    *
    * Note that Hive table property `spark.sql.create.version` also has Spark version.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org