You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/29 07:16:09 UTC

[flink] branch release-1.7 updated: [FLINK-11436] [avro] Manually Java-deserialize AvroSerializer for backwards compatibility

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
     new 4821168  [FLINK-11436] [avro] Manually Java-deserialize AvroSerializer for backwards compatibility
4821168 is described below

commit 482116806bf9c4159a505d2c013dcc298e7c8174
Author: Igal Shilman <ig...@data-artisans.com>
AuthorDate: Sun Jan 27 08:35:52 2019 +0100

    [FLINK-11436] [avro] Manually Java-deserialize AvroSerializer for backwards compatibility
    
    During the release of Flink 1.7, the value of serialVersionUID was uptick to 2L (was 1L before)
    And although the AvroSerializer (along with it's snapshot class) were migrated to the new serialization
    abstraction (hence free from Java serialization), there were composite serializers that were not migrated
    and were serialized with Java serialization.
    This commit manually Java-Deserializes the AvroSerializer to support backwards compatability.
    
    This closes #7580.
---
 .../org/apache/flink/util/InstantiationUtil.java   |  11 +-
 .../flink/formats/avro/typeutils/AvroFactory.java  |   6 +-
 .../formats/avro/typeutils/AvroSerializer.java     | 106 +++++++++++-
 .../typeutils/AvroSerializerMigrationTest.java     | 187 +++++++++++++++++++++
 4 files changed, 296 insertions(+), 14 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
index eab8f4c..f4600a2 100644
--- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java
@@ -210,11 +210,12 @@ public final class InstantiationUtil {
 
 			final Class localClass = resolveClass(streamClassDescriptor);
 			final String name = localClass.getName();
-			if (scalaSerializerClassnames.contains(name) || scalaTypes.contains(name) || isAnonymousClass(localClass)) {
+			if (scalaSerializerClassnames.contains(name) || scalaTypes.contains(name) || isAnonymousClass(localClass)
+				|| isOldAvroSerializer(name, streamClassDescriptor.getSerialVersionUID())) {
 				final ObjectStreamClass localClassDescriptor = ObjectStreamClass.lookup(localClass);
 				if (localClassDescriptor != null
 					&& localClassDescriptor.getSerialVersionUID() != streamClassDescriptor.getSerialVersionUID()) {
-					LOG.warn("Ignoring serialVersionUID mismatch for anonymous class {}; was {}, now {}.",
+					LOG.warn("Ignoring serialVersionUID mismatch for class {}; was {}, now {}.",
 						streamClassDescriptor.getName(), streamClassDescriptor.getSerialVersionUID(), localClassDescriptor.getSerialVersionUID());
 
 					streamClassDescriptor = localClassDescriptor;
@@ -223,6 +224,7 @@ public final class InstantiationUtil {
 
 			return streamClassDescriptor;
 		}
+
 	}
 
 	private static boolean isAnonymousClass(Class clazz) {
@@ -242,6 +244,11 @@ public final class InstantiationUtil {
 		}
 	}
 
+	private static boolean isOldAvroSerializer(String name, long serialVersionUID) {
+		// please see FLINK-11436 for details on why we need to ignore serial version UID here for the AvroSerializer
+		return (serialVersionUID == 1) && "org.apache.flink.formats.avro.typeutils.AvroSerializer".equals(name);
+	}
+
 	/**
 	 * A mapping between the full path of a deprecated serializer and its equivalent.
 	 * These mappings are hardcoded and fixed.
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
index 0ca25bf..9a8bdcb 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java
@@ -86,9 +86,9 @@ final class AvroFactory<T> {
 		return fromReflective(type, cl, Optional.ofNullable(previousSchema));
 	}
 
-	static <T> AvroFactory<T> createFromTypeAndSchemaString(Class<T> type, @Nullable String schemaString) {
-		Schema schema = (schemaString != null) ? new Schema.Parser().parse(schemaString) : null;
-		return create(type, schema, null);
+	@Nullable
+	static Schema parseSchemaString(@Nullable String schemaString) {
+		return (schemaString == null) ? null : new Schema.Parser().parse(schemaString);
 	}
 
 	@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
index 2be660a..da51117 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
@@ -32,12 +32,15 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
-import org.apache.avro.reflect.Nullable;
 import org.apache.avro.specific.SpecificRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 import java.io.IOException;
+import java.io.ObjectInputStream;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -75,11 +78,9 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 
 	// -------- configuration fields, serializable -----------
 
-	/** The class of the type that is serialized by this serializer.
-	 */
-	private final Class<T> type;
-	private final SerializableAvroSchema schema;
-	private final SerializableAvroSchema previousSchema;
+	@Nonnull private Class<T> type;
+	@Nonnull private SerializableAvroSchema schema;
+	@Nonnull private SerializableAvroSchema previousSchema;
 
 	// -------- runtime fields, non-serializable, lazily initialized -----------
 
@@ -127,10 +128,10 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 	 * Creates a new AvroSerializer for the type indicated by the given class.
 	 */
 	@Internal
-	AvroSerializer(Class<T> type, @Nullable SerializableAvroSchema newSchema, @Nullable SerializableAvroSchema previousSchema) {
+	AvroSerializer(Class<T> type, SerializableAvroSchema newSchema, SerializableAvroSchema previousSchema) {
 		this.type = checkNotNull(type);
-		this.schema = newSchema;
-		this.previousSchema = previousSchema;
+		this.schema = checkNotNull(newSchema);
+		this.previousSchema = checkNotNull(previousSchema);
 	}
 
 	/**
@@ -144,6 +145,7 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 
 	// ------------------------------------------------------------------------
 
+	@Nonnull
 	public Class<T> getType() {
 		return type;
 	}
@@ -381,4 +383,90 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 		}
 
 	}
+
+	// -------- backwards compatibility with 1.5, 1.6 -----------
+
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		/*
+		Please see FLINK-11436 for details on why manual deserialization is required.
+
+		During the release of Flink 1.7, the value of serialVersionUID was uptick to 2L (was 1L before)
+		And although the AvroSerializer (along with it's snapshot class) were migrated to the new serialization
+		abstraction (hence free from Java serialization), there were composite serializers that were not migrated
+		and were serialized with Java serialization. In case that one of the nested serializers were Avro we would
+		bump into deserialization exception due to a wrong serialVersionUID. Unfortunately it is not possible to revert
+		the serialVersionUID back to 1L, because users might have snapshots with 2L present already.
+		To overcome this we first need to make sure that the AvroSerializer is being Java deserialized with
+		FailureTolerantObjectInputStream, and then we determine the serialized layout by looking at the fields.
+
+		From: https://docs.oracle.com/javase/8/docs/platform/serialization/spec/class.html#a5421
+		-------------------------------------------------------------------------------------------------------------
+		The descriptors for primitive typed fields are written first
+		sorted by field name followed by descriptors for the object typed fields sorted by field name.
+		The names are sorted using String.compareTo.
+		-------------------------------------------------------------------------------------------------------------
+
+		pre 1.6		field order:   	[type]
+		pre 1.7 	field order:   	[schemaString, 		type]
+		post 1.7 	field order:	[previousSchema,	schema,		type]
+
+		We would use the first field to distinguish between the three different layouts.
+		To complicate things even further in pre 1.7, the field @schemaString could be
+		null or a string, but, in post 1.7, the field @previousSchema was never set to null, therefore
+		we can use the first field to determine the version.
+
+		this logic should stay here as long as we support Flink 1.6 (along with Java serialized
+		TypeSerializers)
+		*/
+		final Object firstField = in.readObject();
+
+		if (firstField == null) {
+			// first field can only be NULL in 1.6 (schemaString)
+			read16Layout(null, in);
+		}
+		else if (firstField instanceof String) {
+			// first field is a String only in 1.6 (schemaString)
+			read16Layout((String) firstField, in);
+		}
+		else if (firstField instanceof Class<?>) {
+			// first field is a Class<?> only in 1.5 (type)
+			@SuppressWarnings("unchecked") Class<T> type = (Class<T>) firstField;
+			read15Layout(type);
+		}
+		else if (firstField instanceof SerializableAvroSchema) {
+			readCurrentLayout((SerializableAvroSchema) firstField, in);
+		}
+		else {
+			throw new IllegalStateException("Failed to Java-Deserialize an AvroSerializer instance. " +
+				"Was expecting a first field to be either a String or SerializableAvroSchema, but got: " +
+				"" + firstField.getClass());
+		}
+	}
+
+	private void read15Layout(Class<T> type) {
+		this.previousSchema = new SerializableAvroSchema();
+		this.schema = new SerializableAvroSchema();
+		this.type = type;
+	}
+
+	@SuppressWarnings("unchecked")
+	private void read16Layout(@Nullable String schemaString, ObjectInputStream in)
+			throws IOException, ClassNotFoundException {
+
+		Schema schema = AvroFactory.parseSchemaString(schemaString);
+		Class<T> type = (Class<T>) in.readObject();
+
+		this.previousSchema = new SerializableAvroSchema();
+		this.schema = new SerializableAvroSchema(schema);
+		this.type = type;
+	}
+
+	@SuppressWarnings("unchecked")
+	private void readCurrentLayout(SerializableAvroSchema previousSchema, ObjectInputStream in)
+			throws IOException, ClassNotFoundException {
+
+		this.previousSchema = previousSchema;
+		this.schema = (SerializableAvroSchema) in.readObject();
+		this.type = (Class<T>) in.readObject();
+	}
 }
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
index a5f2a8e..8b35a92 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerMigrationTest.java
@@ -18,15 +18,33 @@
 
 package org.apache.flink.formats.avro.typeutils;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
+import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 
 import org.apache.avro.generic.GenericRecord;
+import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
 import java.util.Arrays;
+import java.util.Base64;
 import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.util.Arrays.asList;
+import static junit.framework.TestCase.assertSame;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 /**
  * Tests migrations for {@link AvroSerializerSnapshot}.
@@ -62,4 +80,173 @@ public class AvroSerializerMigrationTest extends TypeSerializerSnapshotMigration
 		);
 	}
 
+	// ---------------------------------------------------------------------------------------------------------------
+	// The following batch of tests are making sure that AvroSerializer class is able to be Java-Deserialized.
+	// see [FLINK-11436] for more information.
+
+	// Once we drop support for versions that carried snapshots with Java-Deserialized serializers we can drop this
+	// batch of tests.
+	// ---------------------------------------------------------------------------------------------------------------
+
+	@Test
+	public void javaDeserializeFromFlink_1_5_ReflectiveRecord() throws IOException {
+		final String avroSerializerBase64 = "AAAAAQAAAQis7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n" +
+			"U2VyaWFsaXplcgAAAAAAAAABAgABTAAEdHlwZXQAEUxqYXZhL2xhbmcvQ2xhc3M7eHIANG9yZy5hcGFj\n" +
+			"aGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwdnIA\n" +
+			"Tm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJvU2VyaWFsaXplck1pZ3Jh\n" +
+			"dGlvblRlc3QkU2ltcGxlUG9qbwAAAAAAAAAAAAAAeHA=";
+
+		TypeSerializer<?> serializer = javaDeserialize(avroSerializerBase64);
+		assertThat(serializer, instanceOf(AvroSerializer.class));
+
+		AvroSerializer<?> avroSerializer = (AvroSerializer<?>) javaDeserialize(avroSerializerBase64);
+		assertSame(avroSerializer.getType(), SimplePojo.class);
+		assertThat(avroSerializer.getAvroSchema(), notNullValue());
+	}
+
+	@Test
+	public void javaDeserializeFromFlink_1_5_SpecificRecord() throws IOException {
+		final String avroSerializerBase64 = "AAAAAQAAASOs7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n" +
+			"U2VyaWFsaXplcgAAAAAAAAABAgABTAAEdHlwZXQAEUxqYXZhL2xhbmcvQ2xhc3M7eHIANG9yZy5hcGFj\n" +
+			"aGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwdnIA\n" +
+			"L29yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLmdlbmVyYXRlZC5BZGRyZXNz7Paj+KjgQ2oMAAB4\n" +
+			"cgArb3JnLmFwYWNoZS5hdnJvLnNwZWNpZmljLlNwZWNpZmljUmVjb3JkQmFzZQKi+azGtzQdDAAAeHA=";
+
+		TypeSerializer<?> serializer = javaDeserialize(avroSerializerBase64);
+		assertThat(serializer, instanceOf(AvroSerializer.class));
+
+		AvroSerializer<?> avroSerializer = (AvroSerializer<?>) javaDeserialize(avroSerializerBase64);
+		assertSame(avroSerializer.getType(), Address.class);
+		assertThat(avroSerializer.getAvroSchema(), is(Address.SCHEMA$));
+	}
+
+	@Test
+	public void javaDeserializeFromFlink_1_6() throws IOException {
+		final String avroSerializer = "AAAAAQAAAUis7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n" +
+			"U2VyaWFsaXplcgAAAAAAAAABAgACTAAMc2NoZW1hU3RyaW5ndAASTGphdmEvbGFuZy9TdHJpbmc7TAAE\n" +
+			"dHlwZXQAEUxqYXZhL2xhbmcvQ2xhc3M7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBl\n" +
+			"dXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwcHZyAC9vcmcuYXBhY2hlLmZsaW5rLmZvcm1h\n" +
+			"dHMuYXZyby5nZW5lcmF0ZWQuQWRkcmVzc+z2o/io4ENqDAAAeHIAK29yZy5hcGFjaGUuYXZyby5zcGVj\n" +
+			"aWZpYy5TcGVjaWZpY1JlY29yZEJhc2UCovmsxrc0HQwAAHhw";
+
+		TypeSerializer<?> avro = javaDeserialize(avroSerializer);
+
+		assertThat(avro, instanceOf(AvroSerializer.class));
+	}
+
+	@Test
+	public void javaDeserializeFromFlink_1_6_GenericRecord() throws IOException {
+		String avroSerializerBase64 = "AAAAAQAAAges7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n" +
+			"U2VyaWFsaXplcgAAAAAAAAABAgACTAAMc2NoZW1hU3RyaW5ndAASTGphdmEvbGFuZy9TdHJpbmc7TAAE\n" +
+			"dHlwZXQAEUxqYXZhL2xhbmcvQ2xhc3M7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBl\n" +
+			"dXRpbHMuVHlwZVNlcmlhbGl6ZXIAAAAAAAAAAQIAAHhwdAEBeyJ0eXBlIjoicmVjb3JkIiwibmFtZSI6\n" +
+			"IkFkZHJlc3MiLCJuYW1lc3BhY2UiOiJvcmcuYXBhY2hlLmZsaW5rLmZvcm1hdHMuYXZyby5nZW5lcmF0\n" +
+			"ZWQiLCJmaWVsZHMiOlt7Im5hbWUiOiJudW0iLCJ0eXBlIjoiaW50In0seyJuYW1lIjoic3RyZWV0Iiwi\n" +
+			"dHlwZSI6InN0cmluZyJ9LHsibmFtZSI6ImNpdHkiLCJ0eXBlIjoic3RyaW5nIn0seyJuYW1lIjoic3Rh\n" +
+			"dGUiLCJ0eXBlIjoic3RyaW5nIn0seyJuYW1lIjoiemlwIiwidHlwZSI6InN0cmluZyJ9XX12cgAlb3Jn\n" +
+			"LmFwYWNoZS5hdnJvLmdlbmVyaWMuR2VuZXJpY1JlY29yZAAAAAAAAAAAAAAAeHA=";
+
+		TypeSerializer<?> serializer = javaDeserialize(avroSerializerBase64);
+
+		AvroSerializer<?> avroSerializer = (AvroSerializer<?>) serializer;
+		assertSame(avroSerializer.getType(), GenericRecord.class);
+		assertThat(avroSerializer.getAvroSchema(), notNullValue());
+	}
+
+	@Test
+	public void javaDeserializeFromFlink_1_7() throws IOException {
+		String avroSerializerBase64 = "AAAAAQAAAeKs7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n" +
+			"U2VyaWFsaXplcgAAAAAAAAACAgADTAAOcHJldmlvdXNTY2hlbWF0AEBMb3JnL2FwYWNoZS9mbGluay9m\n" +
+			"b3JtYXRzL2F2cm8vdHlwZXV0aWxzL1NlcmlhbGl6YWJsZUF2cm9TY2hlbWE7TAAGc2NoZW1hcQB+AAFM\n" +
+			"AAR0eXBldAARTGphdmEvbGFuZy9DbGFzczt4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5\n" +
+			"cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA+b3JnLmFwYWNoZS5mbGluay5mb3Jt\n" +
+			"YXRzLmF2cm8udHlwZXV0aWxzLlNlcmlhbGl6YWJsZUF2cm9TY2hlbWEAAAAAAAAAAQMAAHhwdwEAeHNx\n" +
+			"AH4ABXcBAHh2cgAvb3JnLmFwYWNoZS5mbGluay5mb3JtYXRzLmF2cm8uZ2VuZXJhdGVkLkFkZHJlc3Ps\n" +
+			"9qP4qOBDagwAAHhyACtvcmcuYXBhY2hlLmF2cm8uc3BlY2lmaWMuU3BlY2lmaWNSZWNvcmRCYXNlAqL5\n" +
+			"rMa3NB0MAAB4cA==";
+
+		AvroSerializer<?> avroSerializer = (AvroSerializer<?>) javaDeserialize(avroSerializerBase64);
+		assertSame(avroSerializer.getType(), Address.class);
+		assertThat(avroSerializer.getAvroSchema(), is(Address.SCHEMA$));
+	}
+
+	@Test
+	public void javaDeserializeFromFlink_1_7_afterInitialization() throws IOException {
+		String avroSerializerBase64 = "AAAAAQAAAeKs7QAFc3IANm9yZy5hcGFjaGUuZmxpbmsuZm9ybWF0cy5hdnJvLnR5cGV1dGlscy5BdnJv\n" +
+			"U2VyaWFsaXplcgAAAAAAAAACAgADTAAOcHJldmlvdXNTY2hlbWF0AEBMb3JnL2FwYWNoZS9mbGluay9m\n" +
+			"b3JtYXRzL2F2cm8vdHlwZXV0aWxzL1NlcmlhbGl6YWJsZUF2cm9TY2hlbWE7TAAGc2NoZW1hcQB+AAFM\n" +
+			"AAR0eXBldAARTGphdmEvbGFuZy9DbGFzczt4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5\n" +
+			"cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA+b3JnLmFwYWNoZS5mbGluay5mb3Jt\n" +
+			"YXRzLmF2cm8udHlwZXV0aWxzLlNlcmlhbGl6YWJsZUF2cm9TY2hlbWEAAAAAAAAAAQMAAHhwdwEAeHNx\n" +
+			"AH4ABXcBAHh2cgAvb3JnLmFwYWNoZS5mbGluay5mb3JtYXRzLmF2cm8uZ2VuZXJhdGVkLkFkZHJlc3Ps\n" +
+			"9qP4qOBDagwAAHhyACtvcmcuYXBhY2hlLmF2cm8uc3BlY2lmaWMuU3BlY2lmaWNSZWNvcmRCYXNlAqL5\n" +
+			"rMa3NB0MAAB4cA==";
+
+		AvroSerializer<?> avroSerializer = (AvroSerializer<?>) javaDeserialize(avroSerializerBase64);
+		assertSame(avroSerializer.getType(), Address.class);
+		assertThat(avroSerializer.getAvroSchema(), is(Address.SCHEMA$));
+	}
+
+	@Test
+	public void compositeSerializerFromFlink_1_6_WithNestedAvroSerializer() throws IOException {
+		String streamElementSerializerBase64 = "AAAAAQAAAq2s7QAFc3IAR29yZy5hcGFjaGUuZmxpbmsuc3RyZWFtaW5nLnJ1bnRpbWUuc3RyZWFtcmVj\n" +
+			"b3JkLlN0cmVhbUVsZW1lbnRTZXJpYWxpemVyAAAAAAAAAAECAAFMAA50eXBlU2VyaWFsaXplcnQANkxv\n" +
+			"cmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hyADRvcmcu\n" +
+			"YXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLlR5cGVTZXJpYWxpemVyAAAAAAAAAAECAAB4\n" +
+			"cHNyADZvcmcuYXBhY2hlLmZsaW5rLmZvcm1hdHMuYXZyby50eXBldXRpbHMuQXZyb1NlcmlhbGl6ZXIA\n" +
+			"AAAAAAAAAQIAAkwADHNjaGVtYVN0cmluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO0wABHR5cGV0ABFMamF2\n" +
+			"YS9sYW5nL0NsYXNzO3hxAH4AAnQBAXsidHlwZSI6InJlY29yZCIsIm5hbWUiOiJBZGRyZXNzIiwibmFt\n" +
+			"ZXNwYWNlIjoib3JnLmFwYWNoZS5mbGluay5mb3JtYXRzLmF2cm8uZ2VuZXJhdGVkIiwiZmllbGRzIjpb\n" +
+			"eyJuYW1lIjoibnVtIiwidHlwZSI6ImludCJ9LHsibmFtZSI6InN0cmVldCIsInR5cGUiOiJzdHJpbmci\n" +
+			"fSx7Im5hbWUiOiJjaXR5IiwidHlwZSI6InN0cmluZyJ9LHsibmFtZSI6InN0YXRlIiwidHlwZSI6InN0\n" +
+			"cmluZyJ9LHsibmFtZSI6InppcCIsInR5cGUiOiJzdHJpbmcifV19dnIAJW9yZy5hcGFjaGUuYXZyby5n\n" +
+			"ZW5lcmljLkdlbmVyaWNSZWNvcmQAAAAAAAAAAAAAAHhw";
+
+		StreamElementSerializer<?> ser = (StreamElementSerializer<?>) javaDeserialize(streamElementSerializerBase64);
+		TypeSerializer<?> containedTypeSerializer = ser.getContainedTypeSerializer();
+
+		assertThat(containedTypeSerializer, instanceOf(AvroSerializer.class));
+
+		AvroSerializer<?> avroSerializer = (AvroSerializer<?>) containedTypeSerializer;
+		assertSame(avroSerializer.getType(), GenericRecord.class);
+		assertThat(avroSerializer.getAvroSchema(), is(Address.SCHEMA$));
+	}
+
+	@Test
+	public void makeSureThatFieldsWereNotChanged() {
+		// This test should be removed once we completely migrate all the composite serializers.
+
+		List<String> serializedFieldNames = Arrays.stream(AvroSerializer.class.getDeclaredFields())
+			.filter(field -> !Modifier.isTransient(field.getModifiers()))
+			.filter(field -> !Modifier.isStatic(field.getModifiers()))
+			.map(Field::getName)
+			.sorted()
+			.collect(Collectors.toList());
+
+		assertThat(serializedFieldNames, is(asList("previousSchema", "schema", "type")));
+	}
+
+	@SuppressWarnings("deprecation")
+	private static TypeSerializer<?> javaDeserialize(String base64) throws IOException {
+		byte[] bytes = Base64.getMimeDecoder().decode(base64);
+		DataInputDeserializer in = new DataInputDeserializer(bytes);
+		return TypeSerializerSerializationUtil.tryReadSerializer(in, Thread.currentThread().getContextClassLoader());
+	}
+
+	/**
+	 * A simple pojo used in these tests.
+	 */
+	public static class SimplePojo {
+		private String foo;
+
+		@SuppressWarnings("unused")
+		public String getFoo() {
+			return foo;
+		}
+
+		@SuppressWarnings("unused")
+		public void setFoo(String foo) {
+			this.foo = foo;
+		}
+	}
 }