You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/11/03 17:11:35 UTC

[12/21] flink git commit: [FLINK-7420] [avro] Replace GenericData.Array by dummy when reading TypeSerializers

[FLINK-7420] [avro] Replace GenericData.Array by dummy when reading TypeSerializers

This also adds a new test that verifies that we correctly register
Avro Serializers when they are present and modifies an existing test to
verify that we correctly register dummy classes.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/29249b2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/29249b2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/29249b2e

Branch: refs/heads/master
Commit: 29249b2eeb9cb9910a5a55ae6c3a0b648d67d2b5
Parents: db7c70f
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Oct 25 17:38:24 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Nov 3 16:40:34 2017 +0100

----------------------------------------------------------------------
 .../flink-connector-kafka-0.10/pom.xml          |   8 ++
 .../flink-connector-kafka-0.11/pom.xml          |   8 ++
 .../flink-connector-kafka-0.8/pom.xml           |   8 ++
 .../flink-connector-kafka-0.9/pom.xml           |   8 ++
 .../TypeSerializerSerializationUtil.java        |  23 +++-
 ...ryoRegistrationSerializerConfigSnapshot.java |   2 +-
 .../kryo/KryoSerializerCompatibilityTest.java   | 125 +++++++++++++++++++
 .../type-with-avro-serialized-using-kryo        |   1 +
 .../type-without-avro-serialized-using-kryo     | Bin 0 -> 31 bytes
 .../AvroKryoSerializerRegistrationsTest.java    | 117 +++++++++++++++++
 .../test/resources/flink_11-kryo_registrations  |  86 +++++++++++++
 flink-libraries/flink-cep/pom.xml               |   8 --
 ...ckendStateMetaInfoSnapshotReaderWriters.java |   4 +-
 .../misc/KryoSerializerRegistrationsTest.java   |  11 ++
 pom.xml                                         |  21 ++--
 15 files changed, 404 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index 2b6660d..3357591 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -95,6 +95,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-connectors/flink-connector-kafka-0.11/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/pom.xml b/flink-connectors/flink-connector-kafka-0.11/pom.xml
index 162d5d0..4f6be1d 100644
--- a/flink-connectors/flink-connector-kafka-0.11/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.11/pom.xml
@@ -104,6 +104,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-connectors/flink-connector-kafka-0.8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/pom.xml b/flink-connectors/flink-connector-kafka-0.8/pom.xml
index c990188..b96274a 100644
--- a/flink-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.8/pom.xml
@@ -83,6 +83,14 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
 			<groupId>org.apache.kafka</groupId>
 			<artifactId>kafka_${scala.binary.version}</artifactId>
 			<version>${kafka.version}</version>

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/pom.xml b/flink-connectors/flink-connector-kafka-0.9/pom.xml
index 819d590..c711c5f 100644
--- a/flink-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.9/pom.xml
@@ -91,6 +91,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
index 058ef46..d03498a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
@@ -74,7 +75,9 @@ public class TypeSerializerSerializationUtil {
 
 	/**
 	 * An {@link ObjectInputStream} that ignores serialVersionUID mismatches when deserializing objects of
-	 * anonymous classes or our Scala serializer classes.
+	 * anonymous classes or our Scala serializer classes and also replaces occurences of GenericData.Array
+	 * (from Avro) by a dummy class so that the KryoSerializer can still be deserialized without
+	 * Avro being on the classpath.
 	 *
 	 * <p>The {@link TypeSerializerSerializationProxy} uses this specific object input stream to read serializers,
 	 * so that mismatching serialVersionUIDs of anonymous classes / Scala serializers are ignored.
@@ -83,9 +86,9 @@ public class TypeSerializerSerializationUtil {
 	 *
 	 * @see <a href="https://issues.apache.org/jira/browse/FLINK-6869">FLINK-6869</a>
 	 */
-	public static class SerialUIDMismatchTolerantInputStream extends InstantiationUtil.ClassLoaderObjectInputStream {
+	public static class FailureTolerantObjectInputStream extends InstantiationUtil.ClassLoaderObjectInputStream {
 
-		public SerialUIDMismatchTolerantInputStream(InputStream in, ClassLoader cl) throws IOException {
+		public FailureTolerantObjectInputStream(InputStream in, ClassLoader cl) throws IOException {
 			super(in, cl);
 		}
 
@@ -93,6 +96,16 @@ public class TypeSerializerSerializationUtil {
 		protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFoundException {
 			ObjectStreamClass streamClassDescriptor = super.readClassDescriptor();
 
+			try {
+				Class.forName(streamClassDescriptor.getName(), false, classLoader);
+			} catch (ClassNotFoundException e) {
+				if (streamClassDescriptor.getName().equals("org.apache.avro.generic.GenericData$Array")) {
+					ObjectStreamClass result = ObjectStreamClass.lookup(
+						KryoRegistrationSerializerConfigSnapshot.DummyRegisteredClass.class);
+					return result;
+				}
+			}
+
 			Class localClass = resolveClass(streamClassDescriptor);
 			if (scalaSerializerClassnames.contains(localClass.getName()) || localClass.isAnonymousClass()
 				// isAnonymousClass does not work for anonymous Scala classes; additionally check by classname
@@ -433,8 +446,8 @@ public class TypeSerializerSerializationUtil {
 
 			ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
 			try (
-				SerialUIDMismatchTolerantInputStream ois =
-					new SerialUIDMismatchTolerantInputStream(new ByteArrayInputStream(buffer), userClassLoader)) {
+				FailureTolerantObjectInputStream ois =
+					new FailureTolerantObjectInputStream(new ByteArrayInputStream(buffer), userClassLoader)) {
 
 				Thread.currentThread().setContextClassLoader(userClassLoader);
 				typeSerializer = (TypeSerializer<T>) ois.readObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
index 14287ca..cdf6b23 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
@@ -217,7 +217,7 @@ public abstract class KryoRegistrationSerializerConfigSnapshot<T> extends Generi
 	/**
 	 * Placeholder dummy for a previously registered class that can no longer be found in classpath on restore.
 	 */
-	public static class DummyRegisteredClass {}
+	public static class DummyRegisteredClass implements Serializable {}
 
 	/**
 	 * Placeholder dummmy for a previously registered Kryo serializer that is no longer valid or in classpath on restore.

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
index 1cacc9e..11c95f1 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
@@ -29,14 +29,20 @@ import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.FileInputStream;
 import java.io.InputStream;
+import java.util.List;
 
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -44,6 +50,9 @@ import static org.junit.Assert.assertTrue;
  */
 public class KryoSerializerCompatibilityTest {
 
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
 	@Test
 	public void testMigrationStrategyForRemovedAvroDependency() throws Exception {
 		KryoSerializer<TestClass> kryoSerializerForA = new KryoSerializer<>(TestClass.class, new ExecutionConfig());
@@ -85,6 +94,122 @@ public class KryoSerializerCompatibilityTest {
 		assertTrue(compatResult.isRequiresMigration());
 	}
 
+	@Test
+	public void testMigrationOfTypeWithAvroType() throws Exception {
+
+		/*
+		 When Avro sees the schema "{"type" : "array", "items" : "boolean"}" it will create a field
+		 of type List<Integer> but the actual type will be GenericData.Array<Integer>. The
+		 KryoSerializer registers a special Serializer for this type that simply deserializes
+		 as ArrayList because Kryo cannot handle GenericData.Array well. Before Flink 1.4 Avro
+		 was always in the classpath but after 1.4 it's only present if the flink-avro jar is
+		 included. This test verifies that we can still deserialize data written pre-1.4.
+		 */
+		class FakeAvroClass {
+			public List<Integer> array;
+
+			FakeAvroClass(List<Integer> array) {
+				this.array = array;
+			}
+		}
+
+		/*
+		// This has to be executed on a pre-1.4 branch to generate the binary blob
+		{
+			ExecutionConfig executionConfig = new ExecutionConfig();
+			KryoSerializer<FakeAvroClass> kryoSerializer =
+				new KryoSerializer<>(FakeAvroClass.class, executionConfig);
+
+			try (
+				FileOutputStream f = new FileOutputStream(
+					"src/test/resources/type-with-avro-serialized-using-kryo");
+				DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(f)) {
+
+
+				GenericData.Array<Integer> array =
+					new GenericData.Array<>(10, Schema.createArray(Schema.create(Schema.Type.INT)));
+
+				array.add(10);
+				array.add(20);
+				array.add(30);
+
+				FakeAvroClass myTestClass = new FakeAvroClass(array);
+
+				kryoSerializer.serialize(myTestClass, outputView);
+			}
+		}
+		*/
+
+		{
+			ExecutionConfig executionConfig = new ExecutionConfig();
+			KryoSerializer<FakeAvroClass> kryoSerializer =
+				new KryoSerializer<>(FakeAvroClass.class, executionConfig);
+
+			try (
+				FileInputStream f = new FileInputStream("src/test/resources/type-with-avro-serialized-using-kryo");
+				DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(f)) {
+
+				thrown.expectMessage("Could not find required Avro dependency");
+				FakeAvroClass myTestClass = kryoSerializer.deserialize(inputView);
+			}
+		}
+	}
+
+	@Test
+	public void testMigrationWithTypeDevoidOfAvroTypes() throws Exception {
+
+		class FakeClass {
+			public List<Integer> array;
+
+			FakeClass(List<Integer> array) {
+				this.array = array;
+			}
+		}
+
+		/*
+		// This has to be executed on a pre-1.4 branch to generate the binary blob
+		{
+			ExecutionConfig executionConfig = new ExecutionConfig();
+			KryoSerializer<FakeClass> kryoSerializer =
+				new KryoSerializer<>(FakeClass.class, executionConfig);
+
+			try (
+				FileOutputStream f = new FileOutputStream(
+					"src/test/resources/type-without-avro-serialized-using-kryo");
+				DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(f)) {
+
+
+				List<Integer> array = new ArrayList<>(10);
+
+				array.add(10);
+				array.add(20);
+				array.add(30);
+
+				FakeClass myTestClass = new FakeClass(array);
+
+				kryoSerializer.serialize(myTestClass, outputView);
+			}
+		}
+		*/
+
+		{
+			ExecutionConfig executionConfig = new ExecutionConfig();
+			KryoSerializer<FakeClass> kryoSerializer =
+				new KryoSerializer<>(FakeClass.class, executionConfig);
+
+			try (
+				FileInputStream f = new FileInputStream("src/test/resources/type-without-avro-serialized-using-kryo");
+				DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(f)) {
+
+				FakeClass myTestClass = kryoSerializer.deserialize(inputView);
+
+				assertThat(myTestClass.array.get(0), is(10));
+				assertThat(myTestClass.array.get(1), is(20));
+				assertThat(myTestClass.array.get(2), is(30));
+			}
+		}
+	}
+
 	/**
 	 * Tests that after reconfiguration, registration ids are reconfigured to
 	 * remain the same as the preceding KryoSerializer.

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/test/resources/type-with-avro-serialized-using-kryo
----------------------------------------------------------------------
diff --git a/flink-core/src/test/resources/type-with-avro-serialized-using-kryo b/flink-core/src/test/resources/type-with-avro-serialized-using-kryo
new file mode 100644
index 0000000..3901024
--- /dev/null
+++ b/flink-core/src/test/resources/type-with-avro-serialized-using-kryo
@@ -0,0 +1 @@
+
(<
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/test/resources/type-without-avro-serialized-using-kryo
----------------------------------------------------------------------
diff --git a/flink-core/src/test/resources/type-without-avro-serialized-using-kryo b/flink-core/src/test/resources/type-without-avro-serialized-using-kryo
new file mode 100644
index 0000000..d95094c
Binary files /dev/null and b/flink-core/src/test/resources/type-without-avro-serialized-using-kryo differ

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java
new file mode 100644
index 0000000..060cfdd
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Registration;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that the set of Kryo registrations is the same across compatible
+ * Flink versions.
+ *
+ * <p>Special version of {@code KryoSerializerRegistrationsTest} that sits in the Avro module
+ * and verifies that we correctly register Avro types at the {@link KryoSerializer} when
+ * Avro is present.
+ */
+public class AvroKryoSerializerRegistrationsTest {
+
+	/**
+	 * Tests that the registered classes in Kryo did not change.
+	 *
+	 * <p>Once we have proper serializer versioning this test will become obsolete.
+	 * But currently a change in the serializers can break savepoint backwards
+	 * compatibility between Flink versions.
+	 */
+	@Test
+	public void testDefaultKryoRegisteredClassesDidNotChange() throws Exception {
+		final Kryo kryo = new KryoSerializer<>(Integer.class, new ExecutionConfig()).getKryo();
+
+		try (BufferedReader reader = new BufferedReader(new InputStreamReader(
+				getClass().getClassLoader().getResourceAsStream("flink_11-kryo_registrations")))) {
+
+			String line;
+			while ((line = reader.readLine()) != null) {
+				String[] split = line.split(",");
+				final int tag = Integer.parseInt(split[0]);
+				final String registeredClass = split[1];
+
+				Registration registration = kryo.getRegistration(tag);
+
+				if (registration == null) {
+					fail(String.format("Registration for %d = %s got lost", tag, registeredClass));
+				}
+				else if (!registeredClass.equals(registration.getType().getName())) {
+					fail(String.format("Registration for %d = %s changed to %s",
+							tag, registeredClass, registration.getType().getName()));
+				}
+			}
+		}
+	}
+
+	/**
+	 * Creates a Kryo serializer and writes the default registrations out to a
+	 * comma separated file with one entry per line:
+	 *
+	 * <pre>
+	 * id,class
+	 * </pre>
+	 *
+	 * <p>The produced file is used to check that the registered IDs don't change
+	 * in future Flink versions.
+	 *
+	 * <p>This method is not used in the tests, but documents how the test file
+	 * has been created and can be used to re-create it if needed.
+	 *
+	 * @param filePath File path to write registrations to
+	 */
+	private void writeDefaultKryoRegistrations(String filePath) throws IOException {
+		final File file = new File(filePath);
+		if (file.exists()) {
+			assertTrue(file.delete());
+		}
+
+		final Kryo kryo = new KryoSerializer<>(Integer.class, new ExecutionConfig()).getKryo();
+		final int nextId = kryo.getNextRegistrationId();
+
+		try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
+			for (int i = 0; i < nextId; i++) {
+				Registration registration = kryo.getRegistration(i);
+				String str = registration.getId() + "," + registration.getType().getName();
+				writer.write(str, 0, str.length());
+				writer.newLine();
+			}
+
+			System.out.println("Created file with registrations at " + file.getAbsolutePath());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations b/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations
new file mode 100644
index 0000000..7000e62
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations
@@ -0,0 +1,86 @@
+0,int
+1,java.lang.String
+2,float
+3,boolean
+4,byte
+5,char
+6,short
+7,long
+8,double
+9,void
+10,scala.collection.convert.Wrappers$SeqWrapper
+11,scala.collection.convert.Wrappers$IteratorWrapper
+12,scala.collection.convert.Wrappers$MapWrapper
+13,scala.collection.convert.Wrappers$JListWrapper
+14,scala.collection.convert.Wrappers$JMapWrapper
+15,scala.Some
+16,scala.util.Left
+17,scala.util.Right
+18,scala.collection.immutable.Vector
+19,scala.collection.immutable.Set$Set1
+20,scala.collection.immutable.Set$Set2
+21,scala.collection.immutable.Set$Set3
+22,scala.collection.immutable.Set$Set4
+23,scala.collection.immutable.HashSet$HashTrieSet
+24,scala.collection.immutable.Map$Map1
+25,scala.collection.immutable.Map$Map2
+26,scala.collection.immutable.Map$Map3
+27,scala.collection.immutable.Map$Map4
+28,scala.collection.immutable.HashMap$HashTrieMap
+29,scala.collection.immutable.Range$Inclusive
+30,scala.collection.immutable.NumericRange$Inclusive
+31,scala.collection.immutable.NumericRange$Exclusive
+32,scala.collection.mutable.BitSet
+33,scala.collection.mutable.HashMap
+34,scala.collection.mutable.HashSet
+35,scala.collection.convert.Wrappers$IterableWrapper
+36,scala.Tuple1
+37,scala.Tuple2
+38,scala.Tuple3
+39,scala.Tuple4
+40,scala.Tuple5
+41,scala.Tuple6
+42,scala.Tuple7
+43,scala.Tuple8
+44,scala.Tuple9
+45,scala.Tuple10
+46,scala.Tuple11
+47,scala.Tuple12
+48,scala.Tuple13
+49,scala.Tuple14
+50,scala.Tuple15
+51,scala.Tuple16
+52,scala.Tuple17
+53,scala.Tuple18
+54,scala.Tuple19
+55,scala.Tuple20
+56,scala.Tuple21
+57,scala.Tuple22
+58,scala.Tuple1$mcJ$sp
+59,scala.Tuple1$mcI$sp
+60,scala.Tuple1$mcD$sp
+61,scala.Tuple2$mcJJ$sp
+62,scala.Tuple2$mcJI$sp
+63,scala.Tuple2$mcJD$sp
+64,scala.Tuple2$mcIJ$sp
+65,scala.Tuple2$mcII$sp
+66,scala.Tuple2$mcID$sp
+67,scala.Tuple2$mcDJ$sp
+68,scala.Tuple2$mcDI$sp
+69,scala.Tuple2$mcDD$sp
+70,scala.Symbol
+71,scala.reflect.ClassTag
+72,scala.runtime.BoxedUnit
+73,java.util.Arrays$ArrayList
+74,java.util.BitSet
+75,java.util.PriorityQueue
+76,java.util.regex.Pattern
+77,java.sql.Date
+78,java.sql.Time
+79,java.sql.Timestamp
+80,java.net.URI
+81,java.net.InetSocketAddress
+82,java.util.UUID
+83,java.util.Locale
+84,java.text.SimpleDateFormat
+85,org.apache.avro.generic.GenericData$Array

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-libraries/flink-cep/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/pom.xml b/flink-libraries/flink-cep/pom.xml
index bd57d17..a561cca 100644
--- a/flink-libraries/flink-cep/pom.xml
+++ b/flink-libraries/flink-cep/pom.xml
@@ -88,14 +88,6 @@ under the License.
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
-
-        <!-- we include Avro to make the CEPMigrationTest work, it uses a Kryo-serialized savepoint (see FLINK-7420) -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-avro_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
     
     <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
index dc322c3..c333397 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
@@ -168,8 +168,8 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters {
 			DataInputViewStream dis = new DataInputViewStream(in);
 			ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
 			try (
-				TypeSerializerSerializationUtil.SerialUIDMismatchTolerantInputStream ois =
-					new TypeSerializerSerializationUtil.SerialUIDMismatchTolerantInputStream(dis, userCodeClassLoader)) {
+				TypeSerializerSerializationUtil.FailureTolerantObjectInputStream ois =
+					new TypeSerializerSerializationUtil.FailureTolerantObjectInputStream(dis, userCodeClassLoader)) {
 
 				Thread.currentThread().setContextClassLoader(userCodeClassLoader);
 				TypeSerializer<S> stateSerializer = (TypeSerializer<S>) ois.readObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java
index 77d2a1a..cbe9394 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java
@@ -33,6 +33,8 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -72,6 +74,15 @@ public class KryoSerializerRegistrationsTest {
 				if (registration == null) {
 					fail(String.format("Registration for %d = %s got lost", tag, registeredClass));
 				}
+				else if (registeredClass.equals("org.apache.avro.generic.GenericData$Array")) {
+					// starting with Flink 1.4 Avro is no longer a dependency of core. Avro is
+					// only available if flink-avro is present. There is a special version of
+					// this test in AvroKryoSerializerRegistrationsTest that verifies correct
+					// registration of Avro types if present
+					assertThat(
+						registration.getType().getName(),
+						is("org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroRegisteredClass"));
+				}
 				else if (!registeredClass.equals(registration.getType().getName())) {
 					fail(String.format("Registration for %d = %s changed to %s",
 							tag, registeredClass, registration.getType().getName()));

http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 773dc34..b93251b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -215,11 +215,11 @@ under the License.
 	</dependencies>
 
 	<!-- this section defines the module versions that are used if nothing else is specified. -->
-	
+
 	<dependencyManagement>
-		<!-- WARN: 
-			DO NOT put 	guava, 
-						protobuf, 
+		<!-- WARN:
+			DO NOT put 	guava,
+						protobuf,
 						asm,
 						netty
 					here. It will overwrite Hadoop's guava dependency (even though we handle it
@@ -367,7 +367,7 @@ under the License.
 				<artifactId>joda-convert</artifactId>
 				<version>1.7</version>
 			</dependency>
-			
+
 			<!-- kryo used in different versions by Flink an chill -->
 			<dependency>
 				<groupId>com.esotericsoftware.kryo</groupId>
@@ -579,7 +579,7 @@ under the License.
 									<outputDir>${project.build.directory}/spotbugs</outputDir>
 									<!-- A list of available stylesheets can be found here: https://github.com/findbugsproject/findbugs/tree/master/findbugs/src/xsl -->
 									<stylesheet>plain.xsl</stylesheet>
-									
+
 									<fileMappers>
 										<fileMapper
 											implementation="org.codehaus.plexus.components.io.filemappers.FileExtensionMapper">
@@ -772,7 +772,7 @@ under the License.
 				</plugins>
 			</build>
 		</profile>
-		
+
 		<profile>
 			<!--japicmp 0.7 does not support deactivation from the command
 				line, so we have to use a workaround with profiles instead.
@@ -842,7 +842,7 @@ under the License.
 				</dependency>
 			</dependencies>
 		</profile>
-		
+
 		<profile>
 			<id>release</id>
 			<properties>
@@ -1027,6 +1027,7 @@ under the License.
 
 						<!-- Test Data. -->
 						<exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude>
+                        <exclude>flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations</exclude>
 						<exclude>flink-runtime/src/test/resources/flink_11-kryo_registrations</exclude>
 						<exclude>flink-core/src/test/resources/kryo-serializer-config-snapshot-v1</exclude>
 						<exclude>flink-formats/flink-avro/src/test/resources/avro/*.avsc</exclude>
@@ -1287,7 +1288,7 @@ under the License.
 			</plugin>
 		</plugins>
 
-		<!-- Plugin configurations for plugins activated in sub-projects --> 
+		<!-- Plugin configurations for plugins activated in sub-projects -->
 
 		<pluginManagement>
 			<plugins>
@@ -1310,7 +1311,7 @@ under the License.
 					<artifactId>maven-shade-plugin</artifactId>
 					<version>2.4.1</version>
 				</plugin>
-				
+
 				<!-- Disable certain plugins in Eclipse -->
 				<plugin>
 					<groupId>org.eclipse.m2e</groupId>