You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/09/29 08:04:18 UTC

[flink] branch release-1.9 updated: [FLINK-12501] Use SpecificRecord.getSchema in AvroFactory

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 1196887  [FLINK-12501] Use SpecificRecord.getSchema in AvroFactory
1196887 is described below

commit 1196887407c9ef3fad7901f1c592c022b890b227
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Fri Aug 30 10:42:58 2019 +0200

    [FLINK-12501] Use SpecificRecord.getSchema in AvroFactory
    
    Before, we were using SpecificData.getSchema(type) which was not working
    for types that were generated using Avrohugger (for Scala) because
    the SCHEMA was generated in the companion object. Now we use a method
    that must be available on all SpecificRecord(s).
    
    We still use the old method as a fallback if we cannot instantiate or
    call getSchema() on the instance.
---
 .../flink/formats/avro/typeutils/AvroFactory.java  | 36 +++++++++++++++++++++-
 .../avro/typeutils/AvroSerializerSnapshot.java     |  2 +-
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
index 9a8bdcb..4916a90 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
@@ -37,6 +37,8 @@ import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.specific.SpecificRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
 
@@ -50,6 +52,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 final class AvroFactory<T> {
 
+	private static final Logger LOG = LoggerFactory.getLogger(AvroFactory.class);
+
 	private final DataOutputEncoder encoder = new DataOutputEncoder();
 	private final DataInputDecoder decoder = new DataInputDecoder();
 
@@ -94,7 +98,7 @@ final class AvroFactory<T> {
 	@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
 	private static <T> AvroFactory<T> fromSpecific(Class<T> type, ClassLoader cl, Optional<Schema> previousSchema) {
 		SpecificData specificData = new SpecificData(cl);
-		Schema newSchema = specificData.getSchema(type);
+		Schema newSchema = extractAvroSpecificSchema(type, specificData);
 
 		return new AvroFactory<>(
 			specificData,
@@ -130,6 +134,36 @@ final class AvroFactory<T> {
 		);
 	}
 
+	/**
+	 * Extracts an Avro {@link Schema} from a {@link SpecificRecord}. We do this either via {@link
+	 * SpecificData} or by instantiating a record and extracting the schema from the instance.
+	 */
+	static <T> Schema extractAvroSpecificSchema(
+			Class<T> type,
+			SpecificData specificData) {
+		Optional<Schema> newSchemaOptional = tryExtractAvroSchemaViaInstance(type);
+		return newSchemaOptional.orElseGet(() -> specificData.getSchema(type));
+	}
+
+	/**
+	 * Extracts an Avro {@link Schema} from a {@link SpecificRecord}. We do this by creating an
+	 * instance of the class using the zero-argument constructor and calling {@link
+	 * SpecificRecord#getSchema()} on it.
+	 */
+	@SuppressWarnings("unchecked")
+	private static Optional<Schema> tryExtractAvroSchemaViaInstance(Class<?> type) {
+		try {
+			SpecificRecord instance = (SpecificRecord) type.newInstance();
+			return Optional.ofNullable(instance.getSchema());
+		} catch (InstantiationException | IllegalAccessException e) {
+			LOG.warn(
+					"Could not extract schema from Avro-generated SpecificRecord class {}: {}.",
+					type,
+					e);
+			return Optional.empty();
+		}
+	}
+
 	private AvroFactory(
 		GenericData avroData,
 		Schema schema,
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java
index a5f47d1..316a162 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java
@@ -186,7 +186,7 @@ public class AvroSerializerSnapshot<T> implements TypeSerializerSnapshot<T> {
 		}
 		if (isSpecificRecord(runtimeType)) {
 			SpecificData d = new SpecificData(cl);
-			return d.getSchema(runtimeType);
+			return AvroFactory.extractAvroSpecificSchema(runtimeType, d);
 		}
 		ReflectData d = new ReflectData(cl);
 		return d.getSchema(runtimeType);