You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/07/09 14:49:32 UTC

[flink] branch release-1.10 updated: [FLINK-18478] Use AvroFactory.extractAvroSpecificSchema in AvroDeserializationSchema

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new cf047a4  [FLINK-18478] Use AvroFactory.extractAvroSpecificSchema in AvroDeserializationSchema
cf047a4 is described below

commit cf047a4d3357784705b0af2ec281cdefce278d7e
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Thu Jul 9 10:02:45 2020 +0200

    [FLINK-18478] Use AvroFactory.extractAvroSpecificSchema in AvroDeserializationSchema
    
    This makes AvroFactory and the used method public where they were
    package private before.
    
    This fixes the problem that AvroDeserializationSchema was not working
    with types generated from avrohugger. It could also just be seen as
    refactoring/code cleanup.
---
 .../java/org/apache/flink/formats/avro/AvroDeserializationSchema.java | 3 ++-
 .../java/org/apache/flink/formats/avro/typeutils/AvroFactory.java     | 4 ++--
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
index a9bdcee..28e2a78 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
@@ -20,6 +20,7 @@ package org.apache.flink.formats.avro;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroFactory;
 import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
 import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
@@ -144,7 +145,7 @@ public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
 		if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
 			SpecificData specificData = new SpecificData(cl);
 			this.datumReader = new SpecificDatumReader<>(specificData);
-			this.reader = specificData.getSchema(recordClazz);
+			this.reader = AvroFactory.extractAvroSpecificSchema(recordClazz, specificData);
 		} else {
 			this.reader = new Schema.Parser().parse(schemaString);
 			GenericData genericData = new GenericData(cl);
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
index 4916a90..ee8250a 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
@@ -50,7 +50,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * @param <T> The type to be serialized.
  */
 @Internal
-final class AvroFactory<T> {
+public final class AvroFactory<T> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(AvroFactory.class);
 
@@ -138,7 +138,7 @@ final class AvroFactory<T> {
 	 * Extracts an Avro {@link Schema} from a {@link SpecificRecord}. We do this either via {@link
 	 * SpecificData} or by instantiating a record and extracting the schema from the instance.
 	 */
-	static <T> Schema extractAvroSpecificSchema(
+	public static <T> Schema extractAvroSpecificSchema(
 			Class<T> type,
 			SpecificData specificData) {
 		Optional<Schema> newSchemaOptional = tryExtractAvroSchemaViaInstance(type);