You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/31 05:59:42 UTC

[GitHub] tzulitai closed pull request #6151: [FLINK-9569] [avro] Fix confusing construction of GenericRecord AvroSerializers

tzulitai closed pull request #6151: [FLINK-9569] [avro] Fix confusing construction of GenericRecord AvroSerializers
URL: https://github.com/apache/flink/pull/6151
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
index b313625bfe2..09bd5dd8ef7 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.formats.avro.typeutils;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
@@ -46,6 +47,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -67,6 +69,7 @@
  *
  * @param <T> The type to be serialized.
  */
+@Internal
 public class AvroSerializer<T> extends TypeSerializer<T> {
 
 	private static final long serialVersionUID = 1L;
@@ -105,41 +108,54 @@
 	/** The currently accessing thread, set and checked on debug level only. */
 	private transient volatile Thread currentThread;
 
-	// ------------------------------------------------------------------------
+	// ----------------------- instantiation methods --------------------------
 
 	/**
 	 * Creates a new AvroSerializer for the type indicated by the given class.
-	 * This constructor is intended to be used with {@link SpecificRecord} or reflection serializer.
-	 * For serializing {@link GenericData.Record} use {@link AvroSerializer#AvroSerializer(Class, Schema)}
+	 *
+	 * <p>This constructor is expected to be used only with {@link GenericRecord}.
+	 * For {@link SpecificRecord} or reflection serializer use {@link AvroSerializer#forNonGeneric(Class)}.
+	 *
+	 * @param schema the explicit schema to use for generic records.
 	 */
-	public AvroSerializer(Class<T> type) {
-		checkArgument(!isGenericRecord(type),
-			"For GenericData.Record use constructor with explicit schema.");
-		this.type = checkNotNull(type);
-		this.schemaString = null;
+	public static AvroSerializer<GenericRecord> forGeneric(Schema schema) {
+		return new AvroSerializer<>(GenericRecord.class, schema);
 	}
 
 	/**
 	 * Creates a new AvroSerializer for the type indicated by the given class.
-	 * This constructor is expected to be used only with {@link GenericData.Record}.
-	 * For {@link SpecificRecord} or reflection serializer use
-	 * {@link AvroSerializer#AvroSerializer(Class)}
+	 *
+	 * <p>This instantiation method is intended to be used with {@link SpecificRecord} or reflection serializer.
+	 * For serializing {@link GenericData.Record} use {@link AvroSerializer#forGeneric(Schema)}.
+	 *
+	 * @param type the type to be serialized.
 	 */
-	public AvroSerializer(Class<T> type, Schema schema) {
-		checkArgument(isGenericRecord(type),
-			"For classes other than GenericData.Record use constructor without explicit schema.");
-		this.type = checkNotNull(type);
-		this.schema = checkNotNull(schema);
-		this.schemaString = schema.toString();
+	public static <T> AvroSerializer<T> forNonGeneric(Class<T> type) {
+		checkArgument(!isGenericRecord(type),
+			"For generic records, use AvroSerializer.forGeneric(schema) to provide an explicit schema.");
+
+		return new AvroSerializer<>(type, null);
 	}
 
+
+	// ------------------------------------------------------------------------
+
 	/**
-	 * @deprecated Use {@link AvroSerializer#AvroSerializer(Class)} instead.
+	 * Private constructor.
+	 *
+	 * @param type the type to be serialized.
+	 * @param schema the explicit schema to use. This is should be non-null only when
+	 *               the type to be serialized are {@link GenericRecord}s.
 	 */
-	@Deprecated
-	@SuppressWarnings("unused")
-	public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) {
-		this(type);
+	private AvroSerializer(Class<T> type, @Nullable Schema schema) {
+		this.type = checkNotNull(type);
+		this.schema = schema;
+
+		if (schema == null) {
+			this.schemaString = null;
+		} else {
+			this.schemaString = schema.toString();
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -311,12 +327,7 @@ private static boolean isGenericRecord(Class<?> type) {
 
 	@Override
 	public TypeSerializer<T> duplicate() {
-		if (schemaString != null) {
-			return new AvroSerializer<>(type, schema);
-		} else {
-			return new AvroSerializer<>(type);
-
-		}
+		return new AvroSerializer<>(type, schema);
 	}
 
 	@Override
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
index 644ee50d361..ca5cf308a2d 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
@@ -82,7 +82,7 @@ public AvroTypeInfo(Class<T> typeClass, boolean useBackwardsCompatibleSerializer
 	public TypeSerializer<T> createSerializer(ExecutionConfig config) {
 		return useBackwardsCompatibleSerializer ?
 				new BackwardsCompatibleAvroSerializer<>(getTypeClass()) :
-				new AvroSerializer<>(getTypeClass());
+				AvroSerializer.forNonGeneric(getTypeClass());
 	}
 
 	@SuppressWarnings("unchecked")
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
index e5eb5d89a1f..4080e280453 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
@@ -61,7 +61,7 @@
 	 */
 	public BackwardsCompatibleAvroSerializer(Class<T> type) {
 		this.type = type;
-		this.serializer = new AvroSerializer<>(type);
+		this.serializer = AvroSerializer.forNonGeneric(type);
 	}
 
 	/**
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
index 83d590acaf9..0df85529790 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
@@ -77,7 +77,7 @@ public boolean isKeyType() {
 
 	@Override
 	public TypeSerializer<GenericRecord> createSerializer(ExecutionConfig config) {
-		return new AvroSerializer<>(GenericRecord.class, schema);
+		return AvroSerializer.forGeneric(schema);
 	}
 
 	@Override
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
index 5744abc1657..e02f9bad646 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/AvroKryoSerializerUtils.java
@@ -70,7 +70,7 @@ public void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegist
 
 	@Override
 	public <T> TypeSerializer<T> createAvroSerializer(Class<T> type) {
-		return new AvroSerializer<>(type);
+		return AvroSerializer.forNonGeneric(type);
 	}
 
 	@Override
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java
index 89be9c06d33..16c594fd736 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericArraySerializerTest.java
@@ -28,6 +28,6 @@
 
 	@Override
 	protected <T> TypeSerializer<T> createComponentSerializer(Class<T> type) {
-		return new AvroSerializer<T>(type);
+		return AvroSerializer.forNonGeneric(type);
 	}
 }
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java
index a247766ae15..40811ddd24f 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeComparatorTest.java
@@ -28,6 +28,6 @@
 
 	@Override
 	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-		return new AvroSerializer<>(type);
+		return AvroSerializer.forNonGeneric(type);
 	}
 }
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java
index 1c1a19b324c..f8b5302f5da 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroGenericTypeSerializerTest.java
@@ -28,6 +28,6 @@
 
 	@Override
 	protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
-		return new AvroSerializer<>(type);
+		return AvroSerializer.forNonGeneric(type);
 	}
 }
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java
index aaa9b4b08b7..5dcf6a770f4 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerConcurrencyTest.java
@@ -40,7 +40,7 @@
 
 	@Test
 	public void testConcurrentUseOfSerializer() throws Exception {
-		final AvroSerializer<String> serializer = new AvroSerializer<>(String.class);
+		final AvroSerializer<String> serializer = AvroSerializer.forNonGeneric(String.class);
 
 		final BlockerSync sync = new BlockerSync();
 
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java
index bb3d911b3d3..354b78e79cd 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerEmptyArrayTest.java
@@ -37,7 +37,7 @@
 	public void testBookSerialization() {
 		try {
 			Book b = new Book(123, "This is a test book", 26382648);
-			AvroSerializer<Book> serializer = new AvroSerializer<Book>(Book.class);
+			AvroSerializer<Book> serializer = AvroSerializer.forNonGeneric(Book.class);
 			SerializerTestInstance<Book> test = new SerializerTestInstance<Book>(serializer, Book.class, -1, b);
 			test.testAll();
 		}
@@ -61,7 +61,7 @@ public void testSerialization() {
 			a.books = books;
 			a.bookType = BookAuthor.BookType.journal;
 
-			AvroSerializer<BookAuthor> serializer = new AvroSerializer<BookAuthor>(BookAuthor.class);
+			AvroSerializer<BookAuthor> serializer = AvroSerializer.forNonGeneric(BookAuthor.class);
 
 			SerializerTestInstance<BookAuthor> test = new SerializerTestInstance<BookAuthor>(serializer, BookAuthor.class, -1, a);
 			test.testAll();
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java
index c15aa7c0141..65ca4103e31 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSerializabilityTest.java
@@ -40,7 +40,7 @@
 
 	@Test
 	public void testDeserializeSerializer() throws Exception {
-		final AvroSerializer<String> currentSerializer = new AvroSerializer<>(String.class);
+		final AvroSerializer<String> currentSerializer = AvroSerializer.forNonGeneric(String.class);
 
 		try (ObjectInputStream in = new ObjectInputStream(
 				getClass().getClassLoader().getResourceAsStream(RESOURCE_NAME))) {
@@ -57,7 +57,7 @@ public void testDeserializeSerializer() throws Exception {
 	// ------------------------------------------------------------------------
 
 	public static void main(String[] args) throws Exception {
-		final AvroSerializer<String> serializer = new AvroSerializer<>(String.class);
+		final AvroSerializer<String> serializer = AvroSerializer.forNonGeneric(String.class);
 
 		final File file = new File("flink-formats/flink-avro/src/test/resources/" + RESOURCE_NAME).getAbsoluteFile();
 
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java
index 0ab58683f5a..3d7c099f5c8 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerTest.java
@@ -32,7 +32,7 @@
 
 	@Override
 	protected TypeSerializer<User> createSerializer() {
-		return new AvroSerializer<>(User.class);
+		return AvroSerializer.forNonGeneric(User.class);
 	}
 
 	@Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services