You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/30 13:59:07 UTC

[GitHub] asfgit closed pull request #6881: [FLINK-10605] [core] Upgrade AvroSerializer snapshot to implement new TypeSerializerSnapshot interface

asfgit closed pull request #6881: [FLINK-10605] [core] Upgrade AvroSerializer snapshot to implement new TypeSerializerSnapshot interface
URL: https://github.com/apache/flink/pull/6881
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
deleted file mode 100644
index 03bacfab134..00000000000
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.flink.annotation.Public;
-
-import org.apache.avro.specific.SpecificRecordBase;
-
-import static org.apache.flink.formats.avro.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema;
-
-/**
- * @deprecated Please use <code>org.apache.flink.formats.avro.typeutils.AvroTypeInfo</code>
- * in the <code>flink-avro</code> module. This class will be removed in the near future.
- */
-@Deprecated
-@Public
-public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> {
-
-	public AvroTypeInfo(Class<T> typeClass) {
-		super(typeClass, generateFieldsFromAvroSchema(typeClass, true));
-	}
-}
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
deleted file mode 100644
index 5f76b094361..00000000000
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.formats.avro.utils.DataInputDecoder;
-import org.apache.flink.formats.avro.utils.DataOutputEncoder;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
-
-import com.esotericsoftware.kryo.Kryo;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.util.Utf8;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Old deprecated Avro serializer. It is retained for a smoother experience when
- * upgrading from an earlier Flink savepoint that stored this serializer.
- */
-@Internal
-@Deprecated
-@SuppressWarnings({"unused", "deprecation"})
-public final class AvroSerializer<T> extends TypeSerializer<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	private final Class<T> type;
-
-	private final Class<? extends T> typeToInstantiate;
-
-	/**
-	 * Map of class tag (using classname as tag) to their Kryo registration.
-	 *
-	 * <p>This map serves as a preview of the final registration result of
-	 * the Kryo instance, taking into account registration overwrites.
-	 */
-	private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
-
-	private transient ReflectDatumWriter<T> writer;
-	private transient ReflectDatumReader<T> reader;
-
-	private transient DataOutputEncoder encoder;
-	private transient DataInputDecoder decoder;
-
-	private transient Kryo kryo;
-
-	private transient T deepCopyInstance;
-
-	// --------------------------------------------------------------------------------------------
-
-	public AvroSerializer(Class<T> type) {
-		this(type, type);
-	}
-
-	public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) {
-		this.type = checkNotNull(type);
-		this.typeToInstantiate = checkNotNull(typeToInstantiate);
-
-		InstantiationUtil.checkForInstantiation(typeToInstantiate);
-
-		this.kryoRegistrations = buildKryoRegistrations(type);
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public AvroSerializer<T> duplicate() {
-		return new AvroSerializer<>(type, typeToInstantiate);
-	}
-
-	@Override
-	public T createInstance() {
-		return InstantiationUtil.instantiate(this.typeToInstantiate);
-	}
-
-	@Override
-	public T copy(T from) {
-		checkKryoInitialized();
-
-		return KryoUtils.copy(from, kryo, this);
-	}
-
-	@Override
-	public T copy(T from, T reuse) {
-		checkKryoInitialized();
-
-		return KryoUtils.copy(from, reuse, kryo, this);
-	}
-
-	@Override
-	public int getLength() {
-		return -1;
-	}
-
-	@Override
-	public void serialize(T value, DataOutputView target) throws IOException {
-		checkAvroInitialized();
-		this.encoder.setOut(target);
-		this.writer.write(value, this.encoder);
-	}
-
-	@Override
-	public T deserialize(DataInputView source) throws IOException {
-		checkAvroInitialized();
-		this.decoder.setIn(source);
-		return this.reader.read(null, this.decoder);
-	}
-
-	@Override
-	public T deserialize(T reuse, DataInputView source) throws IOException {
-		checkAvroInitialized();
-		this.decoder.setIn(source);
-		return this.reader.read(reuse, this.decoder);
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		checkAvroInitialized();
-
-		if (this.deepCopyInstance == null) {
-			this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class);
-		}
-
-		this.decoder.setIn(source);
-		this.encoder.setOut(target);
-
-		T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
-		this.writer.write(tmp, this.encoder);
-	}
-
-	private void checkAvroInitialized() {
-		if (this.reader == null) {
-			this.reader = new ReflectDatumReader<>(type);
-			this.writer = new ReflectDatumWriter<>(type);
-			this.encoder = new DataOutputEncoder();
-			this.decoder = new DataInputDecoder();
-		}
-	}
-
-	private void checkKryoInitialized() {
-		if (this.kryo == null) {
-			this.kryo = new Kryo();
-
-			Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
-			instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
-			kryo.setInstantiatorStrategy(instantiatorStrategy);
-
-			kryo.setAsmEnabled(true);
-
-			KryoUtils.applyRegistrations(kryo, kryoRegistrations.values());
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public int hashCode() {
-		return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof AvroSerializer) {
-			@SuppressWarnings("unchecked")
-			AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj;
-
-			return avroSerializer.canEqual(this) &&
-					type == avroSerializer.type &&
-					typeToInstantiate == avroSerializer.typeToInstantiate;
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof AvroSerializer;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Serializer configuration snapshotting & compatibility
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
-		return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-		if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
-			final AvroSerializerConfigSnapshot<T> config = (AvroSerializerConfigSnapshot<T>) configSnapshot;
-
-			if (type.equals(config.getTypeClass()) && typeToInstantiate.equals(config.getTypeToInstantiate())) {
-				// resolve Kryo registrations; currently, since the Kryo registrations in Avro
-				// are fixed, there shouldn't be a problem with the resolution here.
-
-				LinkedHashMap<String, KryoRegistration> oldRegistrations = config.getKryoRegistrations();
-				oldRegistrations.putAll(kryoRegistrations);
-
-				for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
-					if (reconfiguredRegistrationEntry.getValue().isDummy()) {
-						return CompatibilityResult.requiresMigration();
-					}
-				}
-
-				this.kryoRegistrations = oldRegistrations;
-				return CompatibilityResult.compatible();
-			}
-		}
-
-		// ends up here if the preceding serializer is not
-		// the ValueSerializer, or serialized data type has changed
-		return CompatibilityResult.requiresMigration();
-	}
-
-	/**
-	 * Config snapshot for this serializer.
-	 */
-	public static class AvroSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> {
-
-		private static final int VERSION = 1;
-
-		private Class<? extends T> typeToInstantiate;
-
-		public AvroSerializerConfigSnapshot() {}
-
-		public AvroSerializerConfigSnapshot(
-				Class<T> baseType,
-				Class<? extends T> typeToInstantiate,
-				LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
-
-			super(baseType, kryoRegistrations);
-			this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate);
-		}
-
-		@Override
-		public void write(DataOutputView out) throws IOException {
-			super.write(out);
-
-			out.writeUTF(typeToInstantiate.getName());
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public void read(DataInputView in) throws IOException {
-			super.read(in);
-
-			String classname = in.readUTF();
-			try {
-				typeToInstantiate = (Class<? extends T>) Class.forName(classname, true, getUserCodeClassLoader());
-			} catch (ClassNotFoundException e) {
-				throw new IOException("Cannot find requested class " + classname + " in classpath.", e);
-			}
-		}
-
-		@Override
-		public int getVersion() {
-			return VERSION;
-		}
-
-		public Class<? extends T> getTypeToInstantiate() {
-			return typeToInstantiate;
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		in.defaultReadObject();
-
-		// kryoRegistrations may be null if this Avro serializer is deserialized from an old version
-		if (kryoRegistrations == null) {
-			this.kryoRegistrations = buildKryoRegistrations(type);
-		}
-	}
-
-	private static <T> LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(Class<T> serializedDataType) {
-		final LinkedHashMap<String, KryoRegistration> registrations = new LinkedHashMap<>();
-
-		// register Avro types.
-		registrations.put(
-				GenericData.Array.class.getName(),
-				new KryoRegistration(
-						GenericData.Array.class,
-						new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
-		registrations.put(Utf8.class.getName(), new KryoRegistration(Utf8.class));
-		registrations.put(GenericData.EnumSymbol.class.getName(), new KryoRegistration(GenericData.EnumSymbol.class));
-		registrations.put(GenericData.Fixed.class.getName(), new KryoRegistration(GenericData.Fixed.class));
-		registrations.put(GenericData.StringType.class.getName(), new KryoRegistration(GenericData.StringType.class));
-
-		// register the serialized data type
-		registrations.put(serializedDataType.getName(), new KryoRegistration(serializedDataType));
-
-		return registrations;
-	}
-}
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
index 2d06476d1c5..a9bdcee8bb5 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
@@ -164,7 +164,7 @@ public boolean isEndOfStream(T nextElement) {
 	@SuppressWarnings("unchecked")
 	public TypeInformation<T> getProducedType() {
 		if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
-			return new AvroTypeInfo(recordClazz, false);
+			return new AvroTypeInfo(recordClazz);
 		} else {
 			return (TypeInformation<T>) new GenericRecordAvroTypeInfo(this.reader);
 		}
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
new file mode 100644
index 00000000000..0ca25bfaa04
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.formats.avro.utils.DataInputDecoder;
+import org.apache.flink.formats.avro.utils.DataOutputEncoder;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.Nullable;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.specific.SpecificRecord;
+
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Creates Avro {@link DatumReader} and {@link DatumWriter}.
+ *
+ * @param <T> The type to be serialized.
+ */
+@Internal
+final class AvroFactory<T> {
+
+	private final DataOutputEncoder encoder = new DataOutputEncoder();
+	private final DataInputDecoder decoder = new DataInputDecoder();
+
+	private final GenericData avroData;
+	private final Schema schema;
+	private final DatumWriter<T> writer;
+	private final DatumReader<T> reader;
+
+	/**
+	 * Creates Avro Writer and Reader for a specific type.
+	 *
+	 * <p>Given an input type, and possible the current schema, and a previously known schema (also known as writer
+	 * schema) create will deduce the best way to initalize a reader and writer according to the following rules:
+	 * <ul>
+	 * <li>If type is an Avro generated class (an {@link SpecificRecord} then the reader would use the
+	 * previousSchema for reading (if present) otherwise it would use the schema attached to the auto generated
+	 * class.
+	 * <li>If the type is a GenericRecord then the reader and the writer would be created with the supplied
+	 * (mandatory) schema.
+	 * <li>Otherwise, we use Avro's reflection based reader and writer that would deduce the schema via reflection.
+	 * If the previous schema is also present (when restoring a serializer for example) then the reader would be
+	 * created with both schemas.
+	 * </ul>
+	 */
+	static <T> AvroFactory<T> create(Class<T> type, @Nullable Schema currentSchema, @Nullable Schema previousSchema) {
+		final ClassLoader cl = Thread.currentThread().getContextClassLoader();
+
+		if (SpecificRecord.class.isAssignableFrom(type)) {
+			return fromSpecific(type, cl, Optional.ofNullable(previousSchema));
+		}
+		if (GenericRecord.class.isAssignableFrom(type)) {
+			return fromGeneric(cl, currentSchema);
+		}
+		return fromReflective(type, cl, Optional.ofNullable(previousSchema));
+	}
+
+	static <T> AvroFactory<T> createFromTypeAndSchemaString(Class<T> type, @Nullable String schemaString) {
+		Schema schema = (schemaString != null) ? new Schema.Parser().parse(schemaString) : null;
+		return create(type, schema, null);
+	}
+
+	@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+	private static <T> AvroFactory<T> fromSpecific(Class<T> type, ClassLoader cl, Optional<Schema> previousSchema) {
+		SpecificData specificData = new SpecificData(cl);
+		Schema newSchema = specificData.getSchema(type);
+
+		return new AvroFactory<>(
+			specificData,
+			newSchema,
+			new SpecificDatumReader<>(previousSchema.orElse(newSchema), newSchema, specificData),
+			new SpecificDatumWriter<>(newSchema, specificData)
+		);
+	}
+
+	private static <T> AvroFactory<T> fromGeneric(ClassLoader cl, Schema schema) {
+		checkNotNull(schema,
+			"Unable to create an AvroSerializer with a GenericRecord type without a schema");
+		GenericData genericData = new GenericData(cl);
+
+		return new AvroFactory<>(
+			genericData,
+			schema,
+			new GenericDatumReader<>(schema, schema, genericData),
+			new GenericDatumWriter<>(schema, genericData)
+		);
+	}
+
+	@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+	private static <T> AvroFactory<T> fromReflective(Class<T> type, ClassLoader cl, Optional<Schema> previousSchema) {
+		ReflectData reflectData = new ReflectData(cl);
+		Schema newSchema = reflectData.getSchema(type);
+
+		return new AvroFactory<>(
+			reflectData,
+			newSchema,
+			new ReflectDatumReader<>(previousSchema.orElse(newSchema), newSchema, reflectData),
+			new ReflectDatumWriter<>(newSchema, reflectData)
+		);
+	}
+
+	private AvroFactory(
+		GenericData avroData,
+		Schema schema,
+		DatumReader<T> reader,
+		DatumWriter<T> writer) {
+
+		this.avroData = checkNotNull(avroData);
+		this.schema = checkNotNull(schema);
+		this.writer = checkNotNull(writer);
+		this.reader = checkNotNull(reader);
+	}
+
+	DataOutputEncoder getEncoder() {
+		return encoder;
+	}
+
+	DataInputDecoder getDecoder() {
+		return decoder;
+	}
+
+	Schema getSchema() {
+		return schema;
+	}
+
+	DatumWriter<T> getWriter() {
+		return writer;
+	}
+
+	DatumReader<T> getReader() {
+		return reader;
+	}
+
+	GenericData getAvroData() {
+		return avroData;
+	}
+}
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
index dad1d6df16a..37aa2d3bc44 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.formats.avro.typeutils;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.formats.avro.utils.DataInputDecoder;
@@ -33,15 +34,10 @@
 import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType;
 import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.Nullable;
 import org.apache.avro.specific.SpecificRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,29 +74,34 @@
 	 * Because this flag is static final, a value of 'false' allows the JIT compiler to eliminate
 	 * the guarded code sections. */
 	private static final boolean CONCURRENT_ACCESS_CHECK =
-			LOG.isDebugEnabled() || AvroSerializerDebugInitHelper.setToDebug;
+		LOG.isDebugEnabled() || AvroSerializerDebugInitHelper.setToDebug;
 
 	// -------- configuration fields, serializable -----------
 
 	/** The class of the type that is serialized by this serializer. */
 	private final Class<T> type;
+	private final SerializableAvroSchema schema;
+	private final SerializableAvroSchema previousSchema;
 
-	private final String schemaString;
+	/** This field was present in this class prior to 1.7, and held the string representation of
+	 * a {@link Schema} (only in the case of an Avro GenericRecord). Since, {@code FsStateBackend} stores the serializer
+	 * (via Java serialization) within the checkpoint to later restore the state, we need to have this field here.
+	 * see {@link #initializeAvro()}.
+	 */
+	@Deprecated
+	private final String schemaString = null;
 
 	// -------- runtime fields, non-serializable, lazily initialized -----------
 
-	private transient GenericDatumWriter<T> writer;
-	private transient GenericDatumReader<T> reader;
-
+	private transient GenericData avroData;
+	private transient DatumWriter<T> writer;
 	private transient DataOutputEncoder encoder;
 	private transient DataInputDecoder decoder;
-
-	private transient GenericData avroData;
-
-	private transient Schema schema;
+	private transient DatumReader<T> reader;
+	private transient Schema runtimeSchema;
 
 	/** The serializer configuration snapshot, cached for efficiency. */
-	private transient AvroSchemaSerializerConfigSnapshot<T> configSnapshot;
+	private transient TypeSerializerSnapshot<T> configSnapshot;
 
 	/** The currently accessing thread, set and checked on debug level only. */
 	private transient volatile Thread currentThread;
@@ -113,10 +114,9 @@
 	 * For serializing {@link GenericData.Record} use {@link AvroSerializer#AvroSerializer(Class, Schema)}
 	 */
 	public AvroSerializer(Class<T> type) {
+		this(checkNotNull(type), new SerializableAvroSchema(), new SerializableAvroSchema());
 		checkArgument(!isGenericRecord(type),
 			"For GenericData.Record use constructor with explicit schema.");
-		this.type = checkNotNull(type);
-		this.schemaString = null;
 	}
 
 	/**
@@ -126,11 +126,19 @@ public AvroSerializer(Class<T> type) {
 	 * {@link AvroSerializer#AvroSerializer(Class)}
 	 */
 	public AvroSerializer(Class<T> type, Schema schema) {
+		this(checkNotNull(type), new SerializableAvroSchema(checkNotNull(schema)), new SerializableAvroSchema());
 		checkArgument(isGenericRecord(type),
 			"For classes other than GenericData.Record use constructor without explicit schema.");
+	}
+
+	/**
+	 * Creates a new AvroSerializer for the type indicated by the given class.
+	 */
+	@Internal
+	AvroSerializer(Class<T> type, @Nullable SerializableAvroSchema newSchema, @Nullable SerializableAvroSchema previousSchema) {
 		this.type = checkNotNull(type);
-		this.schema = checkNotNull(schema);
-		this.schemaString = schema.toString();
+		this.schema = newSchema;
+		this.previousSchema = previousSchema;
 	}
 
 	/**
@@ -167,7 +175,7 @@ public int getLength() {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public T createInstance() {
+	public T createInstance()  {
 		return InstantiationUtil.instantiate(type);
 	}
 
@@ -237,7 +245,7 @@ public T copy(T from) {
 
 		try {
 			checkAvroInitialized();
-			return avroData.deepCopy(schema, from);
+			return avroData.deepCopy(runtimeSchema, from);
 		}
 		finally {
 			if (CONCURRENT_ACCESS_CHECK) {
@@ -264,10 +272,10 @@ public void copy(DataInputView source, DataOutputView target) throws IOException
 	// ------------------------------------------------------------------------
 
 	@Override
-	public TypeSerializerConfigSnapshot<T> snapshotConfiguration() {
+	public TypeSerializerSnapshot<T> snapshotConfiguration() {
 		if (configSnapshot == null) {
 			checkAvroInitialized();
-			configSnapshot = new AvroSchemaSerializerConfigSnapshot<>(schema.toString(false));
+			configSnapshot = new AvroSerializerSnapshot<>(runtimeSchema, type);
 		}
 		return configSnapshot;
 	}
@@ -282,18 +290,10 @@ public void copy(DataInputView source, DataOutputView target) throws IOException
 
 			checkAvroInitialized();
 			final SchemaPairCompatibility compatibility =
-					SchemaCompatibility.checkReaderWriterCompatibility(schema, lastSchema);
+				SchemaCompatibility.checkReaderWriterCompatibility(runtimeSchema, lastSchema);
 
 			return compatibility.getType() == SchemaCompatibilityType.COMPATIBLE ?
-					CompatibilityResult.compatible() : CompatibilityResult.requiresMigration();
-		}
-		else if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
-			// old snapshot case, just compare the type
-			// we don't need to restore any Kryo stuff, since Kryo was never used for persistence,
-			// only for object-to-object copies.
-			final AvroSerializerConfigSnapshot<T> old = (AvroSerializerConfigSnapshot<T>) configSnapshot;
-			return type.equals(old.getTypeClass()) ?
-					CompatibilityResult.compatible() : CompatibilityResult.requiresMigration();
+				CompatibilityResult.compatible() : CompatibilityResult.requiresMigration();
 		}
 		else {
 			return CompatibilityResult.requiresMigration();
@@ -304,19 +304,15 @@ else if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private static boolean isGenericRecord(Class<?> type) {
+	static boolean isGenericRecord(Class<?> type) {
 		return !SpecificRecord.class.isAssignableFrom(type) &&
 			GenericRecord.class.isAssignableFrom(type);
 	}
 
 	@Override
 	public TypeSerializer<T> duplicate() {
-		if (schemaString != null) {
-			return new AvroSerializer<>(type, schema);
-		} else {
-			return new AvroSerializer<>(type);
-
-		}
+		checkAvroInitialized();
+		return new AvroSerializer<>(type, new SerializableAvroSchema(runtimeSchema), previousSchema);
 	}
 
 	@Override
@@ -359,32 +355,26 @@ private void checkAvroInitialized() {
 	}
 
 	private void initializeAvro() {
-		final ClassLoader cl = Thread.currentThread().getContextClassLoader();
-
-		if (SpecificRecord.class.isAssignableFrom(type)) {
-			SpecificData specificData = new SpecificData(cl);
-			this.avroData = specificData;
-			this.schema = specificData.getSchema(type);
-			this.reader = new SpecificDatumReader<>(schema, schema, specificData);
-			this.writer = new SpecificDatumWriter<>(schema, specificData);
-		} else if (GenericRecord.class.isAssignableFrom(type)) {
-			if (schema == null) {
-				this.schema = new Schema.Parser().parse(schemaString);
-			}
-			GenericData genericData = new GenericData(cl);
-			this.avroData = genericData;
-			this.reader = new GenericDatumReader<>(schema, schema, genericData);
-			this.writer = new GenericDatumWriter<>(schema, genericData);
-		} else {
-			final ReflectData reflectData = new ReflectData(cl);
-			this.avroData = reflectData;
-			this.schema = reflectData.getSchema(type);
-			this.reader = new ReflectDatumReader<>(schema, schema, reflectData);
-			this.writer = new ReflectDatumWriter<>(schema, reflectData);
-		}
-
-		this.encoder = new DataOutputEncoder();
-		this.decoder = new DataInputDecoder();
+		final AvroFactory<T> factory;
+		if (wasThisInstanceDeserializedFromAPre17Version()) {
+			// since schema is a final field that is initialized to a non null value,
+			// this can only have happened when restoring from a checkpoint in an FsStateBackend pre Flink 1.7.
+			// To maintain backwards compatibility we need to use the information stored at schemaString.
+			factory = AvroFactory.createFromTypeAndSchemaString(type, schemaString);
+		}
+		else {
+			factory = AvroFactory.create(type, schema.getAvroSchema(), previousSchema.getAvroSchema());
+		}
+		this.runtimeSchema = factory.getSchema();
+		this.writer = factory.getWriter();
+		this.reader = factory.getReader();
+		this.encoder = factory.getEncoder();
+		this.decoder = factory.getDecoder();
+		this.avroData = factory.getAvroData();
+	}
+
+	private boolean wasThisInstanceDeserializedFromAPre17Version() {
+		return (schema == null);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -403,8 +393,8 @@ private void enterExclusiveThread() {
 		}
 		else if (previous != thisThread) {
 			throw new IllegalStateException(
-					"Concurrent access to KryoSerializer. Thread 1: " + thisThread.getName() +
-							" , Thread 2: " + previous.getName());
+				"Concurrent access to KryoSerializer. Thread 1: " + thisThread.getName() +
+					" , Thread 2: " + previous.getName());
 		}
 	}
 
@@ -412,13 +402,20 @@ private void exitExclusiveThread() {
 		currentThread = null;
 	}
 
+	Schema getAvroSchema() {
+		checkAvroInitialized();
+		return runtimeSchema;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Serializer Snapshots
 	// ------------------------------------------------------------------------
 
 	/**
 	 * A config snapshot for the Avro Serializer that stores the Avro Schema to check compatibility.
+	 * This class is now deprecated and only kept for backward comparability.
 	 */
+	@Deprecated
 	public static final class AvroSchemaSerializerConfigSnapshot<T> extends TypeSerializerConfigSnapshot<T> {
 
 		private String schemaString;
@@ -427,8 +424,13 @@ private void exitExclusiveThread() {
 		 * Default constructor for instantiation via reflection.
 		 */
 		@SuppressWarnings("unused")
-		public AvroSchemaSerializerConfigSnapshot() {}
+		public AvroSchemaSerializerConfigSnapshot() {
+		}
 
+		/**
+		 * AvroSerializer now uses the new {@link AvroSerializerSnapshot} class instead.
+		 */
+		@SuppressWarnings("unused")
 		public AvroSchemaSerializerConfigSnapshot(String schemaString) {
 			this.schemaString = checkNotNull(schemaString);
 		}
@@ -485,47 +487,4 @@ public String toString() {
 		}
 	}
 
-	/**
-	 * The outdated config snapshot, retained for backwards compatibility.
-	 *
-	 * @deprecated The {@link AvroSchemaSerializerConfigSnapshot} should be used instead.
-	 */
-	@Deprecated
-	public static class AvroSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> {
-
-		private static final int VERSION = 1;
-
-		private Class<? extends T> typeToInstantiate;
-
-		public AvroSerializerConfigSnapshot() {}
-
-		@Override
-		public void write(DataOutputView out) throws IOException {
-			super.write(out);
-
-			out.writeUTF(typeToInstantiate.getName());
-		}
-
-		@SuppressWarnings("unchecked")
-		@Override
-		public void read(DataInputView in) throws IOException {
-			super.read(in);
-
-			String classname = in.readUTF();
-			try {
-				typeToInstantiate = (Class<? extends T>) Class.forName(classname, true, getUserCodeClassLoader());
-			} catch (ClassNotFoundException e) {
-				throw new IOException("Cannot find requested class " + classname + " in classpath.", e);
-			}
-		}
-
-		@Override
-		public int getVersion() {
-			return VERSION;
-		}
-
-		public Class<? extends T> getTypeToInstantiate() {
-			return typeToInstantiate;
-		}
-	}
 }
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java
new file mode 100644
index 00000000000..cb549ebd98e
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.apache.flink.formats.avro.typeutils.AvroSerializer.isGenericRecord;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An {@code Avro} specific implementation of a {@link TypeSerializerSnapshot}.
+ *
+ * @param <T> The data type that the originating serializer of this configuration serializes.
+ */
+public final class AvroSerializerSnapshot<T> implements TypeSerializerSnapshot<T> {
+	private Class<T> runtimeType;
+	private Schema schema;
+	private Schema runtimeSchema;
+
+	@SuppressWarnings("WeakerAccess")
+	public AvroSerializerSnapshot() {
+		// this constructor is used when restoring from a checkpoint.
+	}
+
+	AvroSerializerSnapshot(Schema schema, Class<T> runtimeType) {
+		this.schema = schema;
+		this.runtimeType = runtimeType;
+	}
+
+	@Override
+	public int getCurrentVersion() {
+		return 1;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		checkNotNull(runtimeType);
+		checkNotNull(schema);
+
+		out.writeUTF(runtimeType.getName());
+		out.writeUTF(schema.toString(false));
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public void read(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+		final String previousRuntimeTypeName = in.readUTF();
+		final String previousSchemaDefinition = in.readUTF();
+
+		this.runtimeType = findClassOrThrow(userCodeClassLoader, previousRuntimeTypeName);
+		this.schema = parseAvroSchema(previousSchemaDefinition);
+		this.runtimeSchema = tryExtractAvroSchema(userCodeClassLoader, runtimeType);
+	}
+
+	@Override
+	public <NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS>
+	resolveSchemaCompatibility(NS newSerializer) {
+		if (!(newSerializer instanceof AvroSerializer)) {
+			return TypeSerializerSchemaCompatibility.incompatible();
+		}
+		AvroSerializer<?> newAvroSerializer = (AvroSerializer<?>) newSerializer;
+		return resolveSchemaCompatibility(schema, newAvroSerializer.getAvroSchema());
+	}
+
+	@Override
+	public TypeSerializer<T> restoreSerializer() {
+		checkNotNull(runtimeType);
+		checkNotNull(schema);
+
+		if (runtimeSchema != null) {
+			return new AvroSerializer<>(runtimeType, new SerializableAvroSchema(runtimeSchema), new SerializableAvroSchema(schema));
+		}
+		else {
+			return new AvroSerializer<>(runtimeType, new SerializableAvroSchema(schema), new SerializableAvroSchema(schema));
+		}
+	}
+
+	// ------------------------------------------------------------------------------------------------------------
+	// Helpers
+	// ------------------------------------------------------------------------------------------------------------
+
+	/**
+	 * Resolves writer/reader schema compatibly.
+	 *
+	 * <p>Checks whenever a new version of a schema (reader) can read values serialized with the old schema (writer).
+	 * If the schemas are compatible according to {@code Avro} schema resolution rules
+	 * (@see <a href="https://avro.apache.org/docs/current/spec.html#Schema+Resolution">Schema Resolution</a>).
+	 */
+	@VisibleForTesting
+	static <T, NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> resolveSchemaCompatibility(
+		Schema writerSchema,
+		Schema readerSchema) {
+
+		if (Objects.equals(writerSchema, readerSchema)) {
+			return TypeSerializerSchemaCompatibility.compatibleAsIs();
+		}
+
+		final SchemaPairCompatibility compatibility =
+			SchemaCompatibility.checkReaderWriterCompatibility(readerSchema, writerSchema);
+
+		return avroCompatibilityToFlinkCompatibility(compatibility);
+	}
+
+	private static <T, NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS>
+	avroCompatibilityToFlinkCompatibility(SchemaPairCompatibility compatibility) {
+		switch (compatibility.getType()) {
+			case COMPATIBLE: {
+				// The new serializer would be able to read data persisted with *this* serializer, therefore no migration
+				// is required.
+				return TypeSerializerSchemaCompatibility.compatibleAsIs();
+			}
+			case INCOMPATIBLE: {
+				return TypeSerializerSchemaCompatibility.incompatible();
+			}
+			case RECURSION_IN_PROGRESS:
+			default:
+				return TypeSerializerSchemaCompatibility.incompatible();
+		}
+	}
+
+	private static Schema parseAvroSchema(String previousSchemaDefinition) {
+		Schema.Parser parser = new Schema.Parser();
+		return parser.parse(previousSchemaDefinition);
+	}
+
+	private static Schema tryExtractAvroSchema(ClassLoader cl, Class<?> runtimeType) {
+		if (isGenericRecord(runtimeType)) {
+			return null;
+		}
+		if (isSpecificRecord(runtimeType)) {
+			SpecificData d = new SpecificData(cl);
+			return d.getSchema(runtimeType);
+		}
+		ReflectData d = new ReflectData(cl);
+		return d.getSchema(runtimeType);
+	}
+
+	@SuppressWarnings("unchecked")
+	private static <T> Class<T> findClassOrThrow(ClassLoader userCodeClassLoader, String className) {
+		try {
+			Class<?> runtimeTarget = Class.forName(className, false, userCodeClassLoader);
+			return (Class<T>) runtimeTarget;
+		}
+		catch (ClassNotFoundException e) {
+			throw new IllegalStateException(""
+				+ "Unable to find the class '" + className + "' which is used to deserialize "
+				+ "the elements of this serializer. "
+				+ "Were the class was moved or renamed?", e);
+		}
+	}
+
+	private static boolean isSpecificRecord(Class<?> runtimeType) {
+		return SpecificRecord.class.isAssignableFrom(runtimeType);
+	}
+}
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
index 644ee50d361..09ce1868707 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroTypeInfo.java
@@ -33,7 +33,6 @@
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Special type information to generate a special AvroTypeInfo for Avro POJOs (implementing SpecificRecordBase, the typed Avro POJOs)
@@ -50,52 +49,22 @@
 
 	private static final long serialVersionUID = 1L;
 
-	private static final ConcurrentHashMap<Thread, Boolean> IN_BACKWARDS_COMPATIBLE_MODE = new ConcurrentHashMap<>();
-
-	private final boolean useBackwardsCompatibleSerializer;
-
 	/**
 	 * Creates a new Avro type info for the given class.
 	 */
 	public AvroTypeInfo(Class<T> typeClass) {
-		this(typeClass, false);
-	}
-
-	/**
-	 * Creates a new Avro type info for the given class.
-	 *
-	 * <p>This constructor takes a flag to specify whether a serializer
-	 * that is backwards compatible with PoJo-style serialization of Avro types should be used.
-	 * That is only necessary, if one has a Flink 1.3 (or earlier) savepoint where Avro types
-	 * were stored in the checkpointed state. New Flink programs will never need this.
-	 */
-	public AvroTypeInfo(Class<T> typeClass, boolean useBackwardsCompatibleSerializer) {
-		super(typeClass, generateFieldsFromAvroSchema(typeClass, useBackwardsCompatibleSerializer));
-
-		final Boolean modeOnStack = IN_BACKWARDS_COMPATIBLE_MODE.get(Thread.currentThread());
-		this.useBackwardsCompatibleSerializer = modeOnStack == null ?
-				useBackwardsCompatibleSerializer : modeOnStack;
+		super(typeClass, generateFieldsFromAvroSchema(typeClass));
 	}
 
 	@Override
 	@SuppressWarnings("deprecation")
 	public TypeSerializer<T> createSerializer(ExecutionConfig config) {
-		return useBackwardsCompatibleSerializer ?
-				new BackwardsCompatibleAvroSerializer<>(getTypeClass()) :
-				new AvroSerializer<>(getTypeClass());
+		return new AvroSerializer<>(getTypeClass());
 	}
 
 	@SuppressWarnings("unchecked")
 	@Internal
-	public static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(
-			Class<T> typeClass,
-			boolean useBackwardsCompatibleSerializer) {
-
-		final Thread currentThread = Thread.currentThread();
-		final boolean entryPoint =
-				IN_BACKWARDS_COMPATIBLE_MODE.putIfAbsent(currentThread, useBackwardsCompatibleSerializer) == null;
-
-		try {
+	private static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) {
 			PojoTypeExtractor pte = new PojoTypeExtractor();
 			ArrayList<Type> typeHierarchy = new ArrayList<>();
 			typeHierarchy.add(typeClass);
@@ -121,12 +90,6 @@ public AvroTypeInfo(Class<T> typeClass, boolean useBackwardsCompatibleSerializer
 				newFields.add(newField);
 			}
 			return newFields;
-		}
-		finally {
-			if (entryPoint) {
-				IN_BACKWARDS_COMPATIBLE_MODE.remove(currentThread);
-			}
-		}
 	}
 
 	private static class PojoTypeExtractor extends TypeExtractor {
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
deleted file mode 100644
index 63b79b2d064..00000000000
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializer.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.avro.typeutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSchemaSerializerConfigSnapshot;
-import org.apache.flink.formats.avro.typeutils.AvroSerializer.AvroSerializerConfigSnapshot;
-
-import org.apache.avro.specific.SpecificRecordBase;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * An Avro serializer that can switch back to a KryoSerializer or a Pojo Serializer, if
- * it has to ensure compatibility with one of those.
- *
- * <p>This serializer is there only as a means to explicitly fall back to PoJo serialization
- * in the case where an upgrade from an earlier savepoint was made.
- *
- * @param <T> The type to be serialized.
- */
-@SuppressWarnings("deprecation")
-public class BackwardsCompatibleAvroSerializer<T> extends TypeSerializer<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	/** The type to serialize. */
-	private final Class<T> type;
-
-	/** The type serializer currently used. Avro by default. */
-	private TypeSerializer<T> serializer;
-
-	/**
-	 * Creates a new backwards-compatible Avro Serializer, for the given type.
-	 */
-	public BackwardsCompatibleAvroSerializer(Class<T> type) {
-		this.type = type;
-		this.serializer = new AvroSerializer<>(type);
-	}
-
-	/**
-	 * Private copy constructor.
-	 */
-	private BackwardsCompatibleAvroSerializer(Class<T> type, TypeSerializer<T> serializer) {
-		this.type = type;
-		this.serializer = serializer;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Properties
-	// ------------------------------------------------------------------------
-
-	@Override
-	public boolean isImmutableType() {
-		return serializer.isImmutableType();
-	}
-
-	@Override
-	public int getLength() {
-		return serializer.getLength();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Serialization
-	// ------------------------------------------------------------------------
-
-	@Override
-	public T createInstance() {
-		return serializer.createInstance();
-	}
-
-	@Override
-	public void serialize(T value, DataOutputView target) throws IOException {
-		serializer.serialize(value, target);
-	}
-
-	@Override
-	public T deserialize(DataInputView source) throws IOException {
-		return serializer.deserialize(source);
-	}
-
-	@Override
-	public T deserialize(T reuse, DataInputView source) throws IOException {
-		return serializer.deserialize(reuse, source);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Copying
-	// ------------------------------------------------------------------------
-
-	@Override
-	public T copy(T from) {
-		return serializer.copy(from);
-	}
-
-	@Override
-	public T copy(T from, T reuse) {
-		return serializer.copy(from, reuse);
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		serializer.copy(source, target);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	@Override
-	public TypeSerializer<T> duplicate() {
-		return new BackwardsCompatibleAvroSerializer<>(type, serializer.duplicate());
-	}
-
-	@Override
-	public int hashCode() {
-		return type.hashCode();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj == this) {
-			return true;
-		}
-		else if (obj != null && obj.getClass() == BackwardsCompatibleAvroSerializer.class) {
-			final BackwardsCompatibleAvroSerializer that = (BackwardsCompatibleAvroSerializer) obj;
-			return this.type == that.type && this.serializer.equals(that.serializer);
-		}
-		else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj.getClass() == this.getClass();
-	}
-
-	@Override
-	public String toString() {
-		return getClass().getName() + " (" + type.getName() + ')';
-	}
-
-	// ------------------------------------------------------------------------
-	//  Configuration Snapshots and Upgrades
-	// ------------------------------------------------------------------------
-
-	@Override
-	public TypeSerializerSnapshot<T> snapshotConfiguration() {
-		// we return the configuration of the actually used serializer here
-		return serializer.snapshotConfiguration();
-	}
-
-	@Override
-	public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
-		if (configSnapshot instanceof AvroSchemaSerializerConfigSnapshot ||
-				configSnapshot instanceof AvroSerializerConfigSnapshot) {
-
-			// avro serializer, nice :-)
-			checkState(serializer instanceof AvroSerializer,
-					"Serializer was changed backwards to PojoSerializer and now encounters AvroSerializer snapshot.");
-
-			return serializer.ensureCompatibility(configSnapshot);
-		}
-		else if (configSnapshot instanceof PojoSerializerConfigSnapshot) {
-			// common previous case
-			checkState(SpecificRecordBase.class.isAssignableFrom(type),
-					"BackwardsCompatibleAvroSerializer resuming a state serialized " +
-							"via a PojoSerializer, but not for an Avro Specific Record");
-
-			final AvroTypeInfo<? extends SpecificRecordBase> typeInfo =
-					new AvroTypeInfo<>(type.asSubclass(SpecificRecordBase.class), true);
-
-			@SuppressWarnings("unchecked")
-			final TypeSerializer<T> pojoSerializer =
-					(TypeSerializer<T>) typeInfo.createPojoSerializer(new ExecutionConfig());
-			this.serializer = pojoSerializer;
-			return serializer.ensureCompatibility(configSnapshot);
-		}
-		else if (configSnapshot instanceof KryoRegistrationSerializerConfigSnapshot) {
-			// force-kryo old case common previous case
-			// we create a new Kryo Serializer with a blank execution config.
-			// registrations are anyways picked up from the snapshot.
-			serializer = new KryoSerializer<>(type, new ExecutionConfig());
-			return serializer.ensureCompatibility(configSnapshot);
-		}
-		else {
-			// completely incompatible type, needs migration
-			return CompatibilityResult.requiresMigration();
-		}
-	}
-}
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java
new file mode 100644
index 00000000000..fb7114489d6
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/SerializableAvroSchema.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.annotation.Internal;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Parser;
+import org.apache.avro.reflect.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * A wrapper for Avro {@link Schema}, that is Java serializable.
+ */
+@Internal
+final class SerializableAvroSchema implements Serializable {
+
+	private static final long serialVersionUID = 1;
+
+	private transient @Nullable Schema schema;
+
+	SerializableAvroSchema() {
+	}
+
+	SerializableAvroSchema(Schema schema) {
+		this.schema = schema;
+	}
+
+	Schema getAvroSchema() {
+		return schema;
+	}
+
+	private void writeObject(ObjectOutputStream oos) throws IOException {
+		if (schema == null) {
+			oos.writeBoolean(false);
+		}
+		else {
+			oos.writeBoolean(true);
+			oos.writeUTF(schema.toString(false));
+		}
+	}
+
+	private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
+		if (ois.readBoolean()) {
+			String schema = ois.readUTF();
+			this.schema = new Parser().parse(schema);
+		}
+		else {
+			this.schema = null;
+		}
+	}
+}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java
new file mode 100644
index 00000000000..a6d98dccdc4
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshotTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.formats.avro.utils.TestDataGenerator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericRecord;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.function.Function;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test {@link AvroSerializerSnapshot}.
+ */
+public class AvroSerializerSnapshotTest {
+
+	private static final Schema FIRST_NAME = SchemaBuilder.record("name")
+		.namespace("org.apache.flink")
+		.fields()
+		.requiredString("first")
+		.endRecord();
+
+	private static final Schema FIRST_REQUIRED_LAST_OPTIONAL = SchemaBuilder.record("name")
+		.namespace("org.apache.flink")
+		.fields()
+		.requiredString("first")
+		.optionalString("last")
+		.endRecord();
+
+	private static final Schema BOTH_REQUIRED = SchemaBuilder.record("name")
+		.namespace("org.apache.flink")
+		.fields()
+		.requiredString("first")
+		.requiredString("last")
+		.endRecord();
+
+	@Test
+	public void sameSchemaShouldBeCompatibleAsIs() {
+		assertThat(AvroSerializerSnapshot.resolveSchemaCompatibility(FIRST_NAME, FIRST_NAME), isCompatibleAsIs());
+	}
+
+	@Test
+	public void removingAnOptionalFieldsIsCompatibleAsIs() {
+		assertThat(AvroSerializerSnapshot.resolveSchemaCompatibility(FIRST_REQUIRED_LAST_OPTIONAL, FIRST_NAME),
+			isCompatibleAsIs());
+	}
+
+	@Test
+	public void addingAnOptionalFieldsIsCompatibleAsIs() {
+		assertThat(AvroSerializerSnapshot.resolveSchemaCompatibility(FIRST_NAME, FIRST_REQUIRED_LAST_OPTIONAL),
+			isCompatibleAsIs());
+	}
+
+	@Test
+	public void addingARequiredMakesSerializersIncompatible() {
+		assertThat(AvroSerializerSnapshot.resolveSchemaCompatibility(FIRST_REQUIRED_LAST_OPTIONAL, BOTH_REQUIRED),
+			isIncompatible());
+	}
+
+	@Test
+	public void anAvroSnapshotIsCompatibleWithItsOriginatingSerializer() {
+		AvroSerializer<GenericRecord> serializer =
+			new AvroSerializer<>(GenericRecord.class, FIRST_REQUIRED_LAST_OPTIONAL);
+
+		TypeSerializerSnapshot<GenericRecord> snapshot = serializer.snapshotConfiguration();
+
+		assertThat(snapshot.resolveSchemaCompatibility(serializer), isCompatibleAsIs());
+	}
+
+	@Test
+	public void anAvroSnapshotIsCompatibleAfterARoundTrip() throws IOException {
+		AvroSerializer<GenericRecord> serializer =
+			new AvroSerializer<>(GenericRecord.class, FIRST_REQUIRED_LAST_OPTIONAL);
+
+		AvroSerializerSnapshot<GenericRecord> restored = roundTrip(serializer.snapshotConfiguration());
+
+		assertThat(restored.resolveSchemaCompatibility(serializer), isCompatibleAsIs());
+	}
+
+	@Test
+	public void anAvroSpecificRecordIsCompatibleAfterARoundTrip() throws IOException {
+		// user is an avro generated test object.
+		AvroSerializer<User> serializer = new AvroSerializer<>(User.class);
+
+		AvroSerializerSnapshot<User> restored = roundTrip(serializer.snapshotConfiguration());
+
+		assertThat(restored.resolveSchemaCompatibility(serializer), isCompatibleAsIs());
+	}
+
+	@Test
+	public void aPojoIsCompatibleAfterARoundTrip() throws IOException {
+		AvroSerializer<Pojo> serializer = new AvroSerializer<>(Pojo.class);
+
+		AvroSerializerSnapshot<Pojo> restored = roundTrip(serializer.snapshotConfiguration());
+
+		assertThat(restored.resolveSchemaCompatibility(serializer), isCompatibleAsIs());
+	}
+
+	@Test
+	public void recordSerializedShouldBeDeserializeWithTheResortedSerializer() throws IOException {
+		// user is an avro generated test object.
+		final User user = TestDataGenerator.generateRandomUser(new Random());
+		final AvroSerializer<User> originalSerializer = new AvroSerializer<>(User.class);
+		//
+		// first serialize the record
+		//
+		ByteBuffer serializedUser = serialize(originalSerializer, user);
+		//
+		// then restore a serializer from the snapshot
+		//
+		TypeSerializer<User> restoredSerializer = originalSerializer.snapshotConfiguration().restoreSerializer();
+		//
+		// now deserialize the user with the resorted serializer.
+		//
+		User restoredUser = deserialize(restoredSerializer, serializedUser);
+
+		assertThat(user, is(restoredUser));
+	}
+
+	@Test
+	public void validSchemaEvaluationShouldResultInCompatibleSerializers() {
+		final AvroSerializer<GenericRecord> originalSerializer = new AvroSerializer<>(GenericRecord.class, FIRST_NAME);
+		final AvroSerializer<GenericRecord> newSerializer = new AvroSerializer<>(GenericRecord.class, FIRST_REQUIRED_LAST_OPTIONAL);
+
+		TypeSerializerSnapshot<GenericRecord> originalSnapshot = originalSerializer.snapshotConfiguration();
+
+		assertThat(originalSnapshot.resolveSchemaCompatibility(newSerializer), isCompatibleAsIs());
+	}
+
+	@Test
+	public void nonValidSchemaEvaluationShouldResultInCompatibleSerializers() {
+		final AvroSerializer<GenericRecord> originalSerializer = new AvroSerializer<>(GenericRecord.class, FIRST_REQUIRED_LAST_OPTIONAL);
+		final AvroSerializer<GenericRecord> newSerializer = new AvroSerializer<>(GenericRecord.class, BOTH_REQUIRED);
+
+		TypeSerializerSnapshot<GenericRecord> originalSnapshot = originalSerializer.snapshotConfiguration();
+
+		assertThat(originalSnapshot.resolveSchemaCompatibility(newSerializer), isIncompatible());
+	}
+
+	@Test
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	public void changingFromGenericToSpecificWithCompatibleSchemaShouldResultInCompatibleSerializers() {
+		// starting with a generic serializer
+		AvroSerializer<Object> generic = new AvroSerializer(GenericRecord.class, User.SCHEMA$);
+		TypeSerializerSnapshot<Object> genericSnapshot = generic.snapshotConfiguration();
+
+		// then upgrading to a specific serializer
+		AvroSerializer<Object> specificSerializer = new AvroSerializer(User.class);
+		specificSerializer.snapshotConfiguration();
+
+		assertThat(genericSnapshot.resolveSchemaCompatibility(specificSerializer), isCompatibleAsIs());
+	}
+
+	// ---------------------------------------------------------------------------------------------------------------
+	// Matchers
+	// ---------------------------------------------------------------------------------------------------------------
+
+	private Matcher<TypeSerializerSchemaCompatibility> isCompatibleAsIs() {
+		return matcher(TypeSerializerSchemaCompatibility::isCompatibleAsIs, "compatible as is");
+	}
+
+	private Matcher<TypeSerializerSchemaCompatibility> isCompatibleAfterMigration() {
+		return matcher(TypeSerializerSchemaCompatibility::isCompatibleAfterMigration,
+			"compatible after migration");
+	}
+
+	private Matcher<TypeSerializerSchemaCompatibility> isIncompatible() {
+		return matcher(TypeSerializerSchemaCompatibility::isIncompatible,
+			"incompatible");
+	}
+
+	private static <T> Matcher<T> matcher(Function<T, Boolean> predicate, String message) {
+		return new TypeSafeDiagnosingMatcher<T>() {
+
+			@Override
+			protected boolean matchesSafely(T item, Description mismatchDescription) {
+				if (predicate.apply(item)) {
+					return true;
+				}
+				mismatchDescription.appendText("not ").appendText(message);
+				return false;
+			}
+
+			@Override
+			public void describeTo(Description description) {
+			}
+		};
+	}
+
+	// ---------------------------------------------------------------------------------------------------------------
+	// Utils
+	// ---------------------------------------------------------------------------------------------------------------
+
+	/**
+	 * Serialize an (avro)TypeSerializerSnapshot and deserialize it.
+	 */
+	private static <T> AvroSerializerSnapshot<T> roundTrip(TypeSerializerSnapshot<T> original) throws IOException {
+		// write
+		DataOutputSerializer out = new DataOutputSerializer(1024);
+		original.write(out);
+
+		// init
+		AvroSerializerSnapshot<T> restored = new AvroSerializerSnapshot<>();
+
+		// read
+		DataInputView in = new DataInputDeserializer(out.wrapAsByteBuffer());
+		restored.read(restored.getCurrentVersion(), in, original.getClass().getClassLoader());
+
+		return restored;
+	}
+
+	private static <T> ByteBuffer serialize(TypeSerializer<T> serializer, T record) throws IOException {
+		DataOutputSerializer out = new DataOutputSerializer(1024);
+		serializer.serialize(record, out);
+		return out.wrapAsByteBuffer();
+	}
+
+	private static <T> T deserialize(TypeSerializer<T> serializer, ByteBuffer serializedRecord) throws IOException {
+		DataInputView in = new DataInputDeserializer(serializedRecord);
+		return serializer.deserialize(in);
+	}
+
+	// ---------------------------------------------------------------------------------------------------------------
+	// Test classes
+	// ---------------------------------------------------------------------------------------------------------------
+
+	private static class Pojo {
+		private String foo;
+
+		public String getFoo() {
+			return foo;
+		}
+
+		public void setFoo(String foo) {
+			this.foo = foo;
+		}
+	}
+}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
deleted file mode 100644
index 7b8763bfa2f..00000000000
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.avro.typeutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.formats.avro.generated.SimpleUser;
-import org.apache.flink.formats.avro.utils.TestDataGenerator;
-
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * This test ensures that state and state configuration created by Flink 1.3 Avro types
- * that used the PojoSerializer still works (in most cases, see notice below).
- *
- * <p><b>Important:</b> Since Avro itself broke class compatibility between 1.7.7 (used in Flink 1.3)
- * and 1.8.2 (used in Flink 1.4), the Avro by Pojo compatibility is broken through Avro already.
- * This test only tests that the Avro serializer change (switching from Pojo to Avro for Avro types)
- * works properly.
- *
- * <p>This test can be dropped once we drop backwards compatibility with Flink 1.3 snapshots.
- *
- * <p>The {@link BackwardsCompatibleAvroSerializer} does not support custom Kryo registrations (which
- * logical types require for Avro 1.8 because Kryo does not support Joda-Time). We introduced a
- * simpler user record for pre-Avro 1.8 test cases.
- */
-public class BackwardsCompatibleAvroSerializerTest {
-
-	private static final String SNAPSHOT_RESOURCE = "flink-1.6-avro-type-serializer-snapshot";
-
-	private static final String DATA_RESOURCE = "flink-1.6-avro-type-serialized-data";
-
-	@SuppressWarnings("unused")
-	private static final String SNAPSHOT_RESOURCE_WRITER = "/data/repositories/flink/flink-formats/flink-avro/src/test/resources/" + SNAPSHOT_RESOURCE;
-
-	@SuppressWarnings("unused")
-	private static final String DATA_RESOURCE_WRITER = "/data/repositories/flink/flink-formats/flink-avro/src/test/resources/" + DATA_RESOURCE;
-
-	private static final long RANDOM_SEED = 143065108437678L;
-
-	private static final int NUM_DATA_ENTRIES = 20;
-
-	@Test
-	public void testCompatibilityWithPojoSerializer() throws Exception {
-
-		// retrieve the old config snapshot
-
-		final TypeSerializer<SimpleUser> serializer;
-		final TypeSerializerSnapshot<SimpleUser> configSnapshot;
-
-		try (InputStream in = getClass().getClassLoader().getResourceAsStream(SNAPSHOT_RESOURCE)) {
-			DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(in);
-
-			List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> deserialized =
-					TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(
-							inView, getClass().getClassLoader());
-
-			assertEquals(1, deserialized.size());
-
-			@SuppressWarnings("unchecked")
-			final TypeSerializer<SimpleUser> typedSerializer = (TypeSerializer<SimpleUser>) deserialized.get(0).f0;
-
-			serializer = typedSerializer;
-			configSnapshot = (TypeSerializerSnapshot<SimpleUser>) deserialized.get(0).f1;
-		}
-
-		assertNotNull(serializer);
-		assertNotNull(configSnapshot);
-
-		assertTrue(serializer instanceof PojoSerializer);
-		assertTrue(configSnapshot instanceof PojoSerializer.PojoSerializerConfigSnapshot);
-
-		// sanity check for the test: check that the test data works with the original serializer
-		validateDeserialization(serializer);
-
-		// sanity check for the test: check that a PoJoSerializer and the original serializer work together
-		assertFalse(serializer.ensureCompatibility(configSnapshot).isRequiresMigration());
-
-		final TypeSerializer<SimpleUser> newSerializer = new AvroTypeInfo<>(SimpleUser.class, true).createSerializer(new ExecutionConfig());
-		assertFalse(newSerializer.ensureCompatibility(configSnapshot).isRequiresMigration());
-
-		// deserialize the data and make sure this still works
-		validateDeserialization(newSerializer);
-
-		TypeSerializerSnapshot<SimpleUser> nextSnapshot = newSerializer.snapshotConfiguration();
-		final TypeSerializer<SimpleUser> nextSerializer = new AvroTypeInfo<>(SimpleUser.class, true).createSerializer(new ExecutionConfig());
-
-		assertFalse(nextSerializer.ensureCompatibility(nextSnapshot).isRequiresMigration());
-
-		// deserialize the data and make sure this still works
-		validateDeserialization(nextSerializer);
-	}
-
-	private static void validateDeserialization(TypeSerializer<SimpleUser> serializer) throws IOException {
-		final Random rnd = new Random(RANDOM_SEED);
-
-		try (InputStream in = BackwardsCompatibleAvroSerializerTest.class.getClassLoader()
-				.getResourceAsStream(DATA_RESOURCE)) {
-
-			final DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(in);
-
-			for (int i = 0; i < NUM_DATA_ENTRIES; i++) {
-				final SimpleUser deserialized = serializer.deserialize(inView);
-
-				// deterministically generate a reference record
-				final SimpleUser reference = TestDataGenerator.generateRandomSimpleUser(rnd);
-
-				assertEquals(reference, deserialized);
-			}
-		}
-	}
-
-// run this code to generate the test data
-//	public static void main(String[] args) throws Exception {
-//
-//		AvroTypeInfo<SimpleUser> typeInfo = new AvroTypeInfo<>(SimpleUser.class);
-//
-//		TypeSerializer<SimpleUser> serializer = typeInfo.createPojoSerializer(new ExecutionConfig());
-//		TypeSerializerConfigSnapshot confSnapshot = serializer.snapshotConfiguration();
-//
-//		try (FileOutputStream fos = new FileOutputStream(SNAPSHOT_RESOURCE_WRITER)) {
-//			DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(fos);
-//
-//			TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
-//					out,
-//					Collections.singletonList(
-//							new Tuple2<>(serializer, confSnapshot)));
-//		}
-//
-//		try (FileOutputStream fos = new FileOutputStream(DATA_RESOURCE_WRITER)) {
-//			final DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(fos);
-//			final Random rnd = new Random(RANDOM_SEED);
-//
-//			for (int i = 0; i < NUM_DATA_ENTRIES; i++) {
-//				serializer.serialize(TestDataGenerator.generateRandomSimpleUser(rnd), out);
-//			}
-//		}
-//	}
-}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services