You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/09/24 09:45:06 UTC

[flink] branch release-1.10 updated: [FLINK-19339] Support unions with logical types in Avro >= 1.9.x

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new b042a48  [FLINK-19339] Support unions with logical types in Avro >= 1.9.x
b042a48 is described below

commit b042a48c96b90f58333e91446797832d41620b47
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon Sep 21 21:26:30 2020 +0200

    [FLINK-19339] Support unions with logical types in Avro >= 1.9.x
    
    Avro 1.9.x introduced yet another mechanism for registering/looking up
    conversions for logical types.
    
    If a logical type is part of a union a static field MODEL$ of type SpecificData
    is added to the generated Avro class with registered conversions for logical types.
    In this commit we try to use that SpecificData in AvroSerializer and
    AvroDeserializationSchema whenever available instead of instantiating a new
    unrelated one.
    
    The change is backwards compatible and if the field is not available we
    fallback to the old ways.
---
 .../formats/avro/AvroDeserializationSchema.java    |  7 ++++--
 .../flink/formats/avro/typeutils/AvroFactory.java  | 26 +++++++++++++++++++---
 .../avro/typeutils/AvroSerializerSnapshot.java     |  3 ++-
 .../avro/AvroDeserializationSchemaTest.java        |  2 +-
 4 files changed, 31 insertions(+), 7 deletions(-)

diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
index 28e2a78..a5f6ac0 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
@@ -143,7 +143,10 @@ public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
 
 		ClassLoader cl = Thread.currentThread().getContextClassLoader();
 		if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
-			SpecificData specificData = new SpecificData(cl);
+			@SuppressWarnings("unchecked")
+			SpecificData specificData = AvroFactory.getSpecificDataForClass(
+				(Class<? extends SpecificData>) recordClazz,
+				cl);
 			this.datumReader = new SpecificDatumReader<>(specificData);
 			this.reader = AvroFactory.extractAvroSpecificSchema(recordClazz, specificData);
 		} else {
@@ -162,7 +165,7 @@ public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
+	@SuppressWarnings({"unchecked", "rawtypes"})
 	public TypeInformation<T> getProducedType() {
 		if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
 			return new AvroTypeInfo(recordClazz);
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
index ee8250a..0ec0b0b 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.formats.avro.typeutils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.formats.avro.utils.DataInputDecoder;
 import org.apache.flink.formats.avro.utils.DataOutputEncoder;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
@@ -40,6 +41,7 @@ import org.apache.avro.specific.SpecificRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Field;
 import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -95,9 +97,9 @@ public final class AvroFactory<T> {
 		return (schemaString == null) ? null : new Schema.Parser().parse(schemaString);
 	}
 
-	@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+	@SuppressWarnings({"OptionalUsedAsFieldOrParameterType", "unchecked"})
 	private static <T> AvroFactory<T> fromSpecific(Class<T> type, ClassLoader cl, Optional<Schema> previousSchema) {
-		SpecificData specificData = new SpecificData(cl);
+		SpecificData specificData = getSpecificDataForClass((Class<? extends SpecificData>) type, cl);
 		Schema newSchema = extractAvroSpecificSchema(type, specificData);
 
 		return new AvroFactory<>(
@@ -146,11 +148,29 @@ public final class AvroFactory<T> {
 	}
 
 	/**
+	 * Creates a {@link SpecificData} object for a given class. Possibly uses the specific data from the generated
+	 * class with logical conversions applied (avro >= 1.9.x).
+	 *
+	 * <p>Copied over from  {@code SpecificData#getForClass(Class<T> c)} we do not use the method directly, because
+	 * we want to be API backwards compatible with older Avro versions which did not have this method
+	 */
+	public static <T extends SpecificData> SpecificData getSpecificDataForClass(Class<T> type, ClassLoader cl) {
+		try {
+			Field specificDataField = type.getDeclaredField("MODEL$");
+			specificDataField.setAccessible(true);
+			return  (SpecificData) specificDataField.get((Object) null);
+		} catch (IllegalAccessException e) {
+			throw new FlinkRuntimeException("Could not access the MODEL$ field of avro record", e);
+		} catch (NoSuchFieldException e) {
+			return new SpecificData(cl);
+		}
+	}
+
+	/**
 	 * Extracts an Avro {@link Schema} from a {@link SpecificRecord}. We do this by creating an
 	 * instance of the class using the zero-argument constructor and calling {@link
 	 * SpecificRecord#getSchema()} on it.
 	 */
-	@SuppressWarnings("unchecked")
 	private static Optional<Schema> tryExtractAvroSchemaViaInstance(Class<?> type) {
 		try {
 			SpecificRecord instance = (SpecificRecord) type.newInstance();
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java
index 316a162..8f8153c 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java
@@ -185,7 +185,8 @@ public class AvroSerializerSnapshot<T> implements TypeSerializerSnapshot<T> {
 			return null;
 		}
 		if (isSpecificRecord(runtimeType)) {
-			SpecificData d = new SpecificData(cl);
+			@SuppressWarnings("unchecked")
+			SpecificData d = AvroFactory.getSpecificDataForClass((Class<? extends SpecificData>) runtimeType, cl);
 			return AvroFactory.extractAvroSpecificSchema(runtimeType, d);
 		}
 		ReflectData d = new ReflectData(cl);
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java
index 8a15fbf..28b3d9e 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java
@@ -52,7 +52,7 @@ public class AvroDeserializationSchemaTest {
 	}
 
 	@Test
-	public void testSpecificRecordWithConfluentSchemaRegistry() throws Exception {
+	public void testSpecificRecord() throws Exception {
 		DeserializationSchema<Address> deserializer = AvroDeserializationSchema.forSpecific(Address.class);
 
 		byte[] encodedAddress = writeRecord(address, Address.getClassSchema());