You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/11/03 17:11:35 UTC
[12/21] flink git commit: [FLINK-7420] [avro] Replace
GenericData.Array by dummy when reading TypeSerializers
[FLINK-7420] [avro] Replace GenericData.Array by dummy when reading TypeSerializers
This also adds a new test that verifies that we correctly register
Avro Serializers when they are present and modifies an existing test to
verify that we correctly register dummy classes.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/29249b2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/29249b2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/29249b2e
Branch: refs/heads/master
Commit: 29249b2eeb9cb9910a5a55ae6c3a0b648d67d2b5
Parents: db7c70f
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Oct 25 17:38:24 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Nov 3 16:40:34 2017 +0100
----------------------------------------------------------------------
.../flink-connector-kafka-0.10/pom.xml | 8 ++
.../flink-connector-kafka-0.11/pom.xml | 8 ++
.../flink-connector-kafka-0.8/pom.xml | 8 ++
.../flink-connector-kafka-0.9/pom.xml | 8 ++
.../TypeSerializerSerializationUtil.java | 23 +++-
...ryoRegistrationSerializerConfigSnapshot.java | 2 +-
.../kryo/KryoSerializerCompatibilityTest.java | 125 +++++++++++++++++++
.../type-with-avro-serialized-using-kryo | 1 +
.../type-without-avro-serialized-using-kryo | Bin 0 -> 31 bytes
.../AvroKryoSerializerRegistrationsTest.java | 117 +++++++++++++++++
.../test/resources/flink_11-kryo_registrations | 86 +++++++++++++
flink-libraries/flink-cep/pom.xml | 8 --
...ckendStateMetaInfoSnapshotReaderWriters.java | 4 +-
.../misc/KryoSerializerRegistrationsTest.java | 11 ++
pom.xml | 21 ++--
15 files changed, 404 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index 2b6660d..3357591 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -95,6 +95,14 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-connectors/flink-connector-kafka-0.11/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/pom.xml b/flink-connectors/flink-connector-kafka-0.11/pom.xml
index 162d5d0..4f6be1d 100644
--- a/flink-connectors/flink-connector-kafka-0.11/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.11/pom.xml
@@ -104,6 +104,14 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-connectors/flink-connector-kafka-0.8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/pom.xml b/flink-connectors/flink-connector-kafka-0.8/pom.xml
index c990188..b96274a 100644
--- a/flink-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.8/pom.xml
@@ -83,6 +83,14 @@ under the License.
</dependency>
<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>${kafka.version}</version>
http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/pom.xml b/flink-connectors/flink-connector-kafka-0.9/pom.xml
index 819d590..c711c5f 100644
--- a/flink-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.9/pom.xml
@@ -91,6 +91,14 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
index 058ef46..d03498a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeutils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
import org.apache.flink.core.io.VersionedIOReadableWritable;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
@@ -74,7 +75,9 @@ public class TypeSerializerSerializationUtil {
/**
* An {@link ObjectInputStream} that ignores serialVersionUID mismatches when deserializing objects of
- * anonymous classes or our Scala serializer classes.
+ * anonymous classes or our Scala serializer classes and also replaces occurences of GenericData.Array
+ * (from Avro) by a dummy class so that the KryoSerializer can still be deserialized without
+ * Avro being on the classpath.
*
* <p>The {@link TypeSerializerSerializationProxy} uses this specific object input stream to read serializers,
* so that mismatching serialVersionUIDs of anonymous classes / Scala serializers are ignored.
@@ -83,9 +86,9 @@ public class TypeSerializerSerializationUtil {
*
* @see <a href="https://issues.apache.org/jira/browse/FLINK-6869">FLINK-6869</a>
*/
- public static class SerialUIDMismatchTolerantInputStream extends InstantiationUtil.ClassLoaderObjectInputStream {
+ public static class FailureTolerantObjectInputStream extends InstantiationUtil.ClassLoaderObjectInputStream {
- public SerialUIDMismatchTolerantInputStream(InputStream in, ClassLoader cl) throws IOException {
+ public FailureTolerantObjectInputStream(InputStream in, ClassLoader cl) throws IOException {
super(in, cl);
}
@@ -93,6 +96,16 @@ public class TypeSerializerSerializationUtil {
protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFoundException {
ObjectStreamClass streamClassDescriptor = super.readClassDescriptor();
+ try {
+ Class.forName(streamClassDescriptor.getName(), false, classLoader);
+ } catch (ClassNotFoundException e) {
+ if (streamClassDescriptor.getName().equals("org.apache.avro.generic.GenericData$Array")) {
+ ObjectStreamClass result = ObjectStreamClass.lookup(
+ KryoRegistrationSerializerConfigSnapshot.DummyRegisteredClass.class);
+ return result;
+ }
+ }
+
Class localClass = resolveClass(streamClassDescriptor);
if (scalaSerializerClassnames.contains(localClass.getName()) || localClass.isAnonymousClass()
// isAnonymousClass does not work for anonymous Scala classes; additionally check by classname
@@ -433,8 +446,8 @@ public class TypeSerializerSerializationUtil {
ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
try (
- SerialUIDMismatchTolerantInputStream ois =
- new SerialUIDMismatchTolerantInputStream(new ByteArrayInputStream(buffer), userClassLoader)) {
+ FailureTolerantObjectInputStream ois =
+ new FailureTolerantObjectInputStream(new ByteArrayInputStream(buffer), userClassLoader)) {
Thread.currentThread().setContextClassLoader(userClassLoader);
typeSerializer = (TypeSerializer<T>) ois.readObject();
http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
index 14287ca..cdf6b23 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistrationSerializerConfigSnapshot.java
@@ -217,7 +217,7 @@ public abstract class KryoRegistrationSerializerConfigSnapshot<T> extends Generi
/**
* Placeholder dummy for a previously registered class that can no longer be found in classpath on restore.
*/
- public static class DummyRegisteredClass {}
+ public static class DummyRegisteredClass implements Serializable {}
/**
* Placeholder dummmy for a previously registered Kryo serializer that is no longer valid or in classpath on restore.
http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
index 1cacc9e..11c95f1 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
@@ -29,14 +29,20 @@ import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.FileInputStream;
import java.io.InputStream;
+import java.util.List;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
@@ -44,6 +50,9 @@ import static org.junit.Assert.assertTrue;
*/
public class KryoSerializerCompatibilityTest {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
@Test
public void testMigrationStrategyForRemovedAvroDependency() throws Exception {
KryoSerializer<TestClass> kryoSerializerForA = new KryoSerializer<>(TestClass.class, new ExecutionConfig());
@@ -85,6 +94,122 @@ public class KryoSerializerCompatibilityTest {
assertTrue(compatResult.isRequiresMigration());
}
+ @Test
+ public void testMigrationOfTypeWithAvroType() throws Exception {
+
+ /*
+ When Avro sees the schema "{"type" : "array", "items" : "boolean"}" it will create a field
+ of type List<Integer> but the actual type will be GenericData.Array<Integer>. The
+ KryoSerializer registers a special Serializer for this type that simply deserializes
+ as ArrayList because Kryo cannot handle GenericData.Array well. Before Flink 1.4 Avro
+ was always in the classpath but after 1.4 it's only present if the flink-avro jar is
+ included. This test verifies that we can still deserialize data written pre-1.4.
+ */
+ class FakeAvroClass {
+ public List<Integer> array;
+
+ FakeAvroClass(List<Integer> array) {
+ this.array = array;
+ }
+ }
+
+ /*
+ // This has to be executed on a pre-1.4 branch to generate the binary blob
+ {
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ KryoSerializer<FakeAvroClass> kryoSerializer =
+ new KryoSerializer<>(FakeAvroClass.class, executionConfig);
+
+ try (
+ FileOutputStream f = new FileOutputStream(
+ "src/test/resources/type-with-avro-serialized-using-kryo");
+ DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(f)) {
+
+
+ GenericData.Array<Integer> array =
+ new GenericData.Array<>(10, Schema.createArray(Schema.create(Schema.Type.INT)));
+
+ array.add(10);
+ array.add(20);
+ array.add(30);
+
+ FakeAvroClass myTestClass = new FakeAvroClass(array);
+
+ kryoSerializer.serialize(myTestClass, outputView);
+ }
+ }
+ */
+
+ {
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ KryoSerializer<FakeAvroClass> kryoSerializer =
+ new KryoSerializer<>(FakeAvroClass.class, executionConfig);
+
+ try (
+ FileInputStream f = new FileInputStream("src/test/resources/type-with-avro-serialized-using-kryo");
+ DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(f)) {
+
+ thrown.expectMessage("Could not find required Avro dependency");
+ FakeAvroClass myTestClass = kryoSerializer.deserialize(inputView);
+ }
+ }
+ }
+
+ @Test
+ public void testMigrationWithTypeDevoidOfAvroTypes() throws Exception {
+
+ class FakeClass {
+ public List<Integer> array;
+
+ FakeClass(List<Integer> array) {
+ this.array = array;
+ }
+ }
+
+ /*
+ // This has to be executed on a pre-1.4 branch to generate the binary blob
+ {
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ KryoSerializer<FakeClass> kryoSerializer =
+ new KryoSerializer<>(FakeClass.class, executionConfig);
+
+ try (
+ FileOutputStream f = new FileOutputStream(
+ "src/test/resources/type-without-avro-serialized-using-kryo");
+ DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(f)) {
+
+
+ List<Integer> array = new ArrayList<>(10);
+
+ array.add(10);
+ array.add(20);
+ array.add(30);
+
+ FakeClass myTestClass = new FakeClass(array);
+
+ kryoSerializer.serialize(myTestClass, outputView);
+ }
+ }
+ */
+
+ {
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ KryoSerializer<FakeClass> kryoSerializer =
+ new KryoSerializer<>(FakeClass.class, executionConfig);
+
+ try (
+ FileInputStream f = new FileInputStream("src/test/resources/type-without-avro-serialized-using-kryo");
+ DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(f)) {
+
+ FakeClass myTestClass = kryoSerializer.deserialize(inputView);
+
+ assertThat(myTestClass.array.get(0), is(10));
+ assertThat(myTestClass.array.get(1), is(20));
+ assertThat(myTestClass.array.get(2), is(30));
+ }
+ }
+ }
+
/**
* Tests that after reconfiguration, registration ids are reconfigured to
* remain the same as the preceding KryoSerializer.
http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/test/resources/type-with-avro-serialized-using-kryo
----------------------------------------------------------------------
diff --git a/flink-core/src/test/resources/type-with-avro-serialized-using-kryo b/flink-core/src/test/resources/type-with-avro-serialized-using-kryo
new file mode 100644
index 0000000..3901024
--- /dev/null
+++ b/flink-core/src/test/resources/type-with-avro-serialized-using-kryo
@@ -0,0 +1 @@
+
(<
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-core/src/test/resources/type-without-avro-serialized-using-kryo
----------------------------------------------------------------------
diff --git a/flink-core/src/test/resources/type-without-avro-serialized-using-kryo b/flink-core/src/test/resources/type-without-avro-serialized-using-kryo
new file mode 100644
index 0000000..d95094c
Binary files /dev/null and b/flink-core/src/test/resources/type-without-avro-serialized-using-kryo differ
http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java
new file mode 100644
index 0000000..060cfdd
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroKryoSerializerRegistrationsTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Registration;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that the set of Kryo registrations is the same across compatible
+ * Flink versions.
+ *
+ * <p>Special version of {@code KryoSerializerRegistrationsTest} that sits in the Avro module
+ * and verifies that we correctly register Avro types at the {@link KryoSerializer} when
+ * Avro is present.
+ */
+public class AvroKryoSerializerRegistrationsTest {
+
+ /**
+ * Tests that the registered classes in Kryo did not change.
+ *
+ * <p>Once we have proper serializer versioning this test will become obsolete.
+ * But currently a change in the serializers can break savepoint backwards
+ * compatibility between Flink versions.
+ */
+ @Test
+ public void testDefaultKryoRegisteredClassesDidNotChange() throws Exception {
+ final Kryo kryo = new KryoSerializer<>(Integer.class, new ExecutionConfig()).getKryo();
+
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(
+ getClass().getClassLoader().getResourceAsStream("flink_11-kryo_registrations")))) {
+
+ String line;
+ while ((line = reader.readLine()) != null) {
+ String[] split = line.split(",");
+ final int tag = Integer.parseInt(split[0]);
+ final String registeredClass = split[1];
+
+ Registration registration = kryo.getRegistration(tag);
+
+ if (registration == null) {
+ fail(String.format("Registration for %d = %s got lost", tag, registeredClass));
+ }
+ else if (!registeredClass.equals(registration.getType().getName())) {
+ fail(String.format("Registration for %d = %s changed to %s",
+ tag, registeredClass, registration.getType().getName()));
+ }
+ }
+ }
+ }
+
+ /**
+ * Creates a Kryo serializer and writes the default registrations out to a
+ * comma separated file with one entry per line:
+ *
+ * <pre>
+ * id,class
+ * </pre>
+ *
+ * <p>The produced file is used to check that the registered IDs don't change
+ * in future Flink versions.
+ *
+ * <p>This method is not used in the tests, but documents how the test file
+ * has been created and can be used to re-create it if needed.
+ *
+ * @param filePath File path to write registrations to
+ */
+ private void writeDefaultKryoRegistrations(String filePath) throws IOException {
+ final File file = new File(filePath);
+ if (file.exists()) {
+ assertTrue(file.delete());
+ }
+
+ final Kryo kryo = new KryoSerializer<>(Integer.class, new ExecutionConfig()).getKryo();
+ final int nextId = kryo.getNextRegistrationId();
+
+ try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
+ for (int i = 0; i < nextId; i++) {
+ Registration registration = kryo.getRegistration(i);
+ String str = registration.getId() + "," + registration.getType().getName();
+ writer.write(str, 0, str.length());
+ writer.newLine();
+ }
+
+ System.out.println("Created file with registrations at " + file.getAbsolutePath());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations b/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations
new file mode 100644
index 0000000..7000e62
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations
@@ -0,0 +1,86 @@
+0,int
+1,java.lang.String
+2,float
+3,boolean
+4,byte
+5,char
+6,short
+7,long
+8,double
+9,void
+10,scala.collection.convert.Wrappers$SeqWrapper
+11,scala.collection.convert.Wrappers$IteratorWrapper
+12,scala.collection.convert.Wrappers$MapWrapper
+13,scala.collection.convert.Wrappers$JListWrapper
+14,scala.collection.convert.Wrappers$JMapWrapper
+15,scala.Some
+16,scala.util.Left
+17,scala.util.Right
+18,scala.collection.immutable.Vector
+19,scala.collection.immutable.Set$Set1
+20,scala.collection.immutable.Set$Set2
+21,scala.collection.immutable.Set$Set3
+22,scala.collection.immutable.Set$Set4
+23,scala.collection.immutable.HashSet$HashTrieSet
+24,scala.collection.immutable.Map$Map1
+25,scala.collection.immutable.Map$Map2
+26,scala.collection.immutable.Map$Map3
+27,scala.collection.immutable.Map$Map4
+28,scala.collection.immutable.HashMap$HashTrieMap
+29,scala.collection.immutable.Range$Inclusive
+30,scala.collection.immutable.NumericRange$Inclusive
+31,scala.collection.immutable.NumericRange$Exclusive
+32,scala.collection.mutable.BitSet
+33,scala.collection.mutable.HashMap
+34,scala.collection.mutable.HashSet
+35,scala.collection.convert.Wrappers$IterableWrapper
+36,scala.Tuple1
+37,scala.Tuple2
+38,scala.Tuple3
+39,scala.Tuple4
+40,scala.Tuple5
+41,scala.Tuple6
+42,scala.Tuple7
+43,scala.Tuple8
+44,scala.Tuple9
+45,scala.Tuple10
+46,scala.Tuple11
+47,scala.Tuple12
+48,scala.Tuple13
+49,scala.Tuple14
+50,scala.Tuple15
+51,scala.Tuple16
+52,scala.Tuple17
+53,scala.Tuple18
+54,scala.Tuple19
+55,scala.Tuple20
+56,scala.Tuple21
+57,scala.Tuple22
+58,scala.Tuple1$mcJ$sp
+59,scala.Tuple1$mcI$sp
+60,scala.Tuple1$mcD$sp
+61,scala.Tuple2$mcJJ$sp
+62,scala.Tuple2$mcJI$sp
+63,scala.Tuple2$mcJD$sp
+64,scala.Tuple2$mcIJ$sp
+65,scala.Tuple2$mcII$sp
+66,scala.Tuple2$mcID$sp
+67,scala.Tuple2$mcDJ$sp
+68,scala.Tuple2$mcDI$sp
+69,scala.Tuple2$mcDD$sp
+70,scala.Symbol
+71,scala.reflect.ClassTag
+72,scala.runtime.BoxedUnit
+73,java.util.Arrays$ArrayList
+74,java.util.BitSet
+75,java.util.PriorityQueue
+76,java.util.regex.Pattern
+77,java.sql.Date
+78,java.sql.Time
+79,java.sql.Timestamp
+80,java.net.URI
+81,java.net.InetSocketAddress
+82,java.util.UUID
+83,java.util.Locale
+84,java.text.SimpleDateFormat
+85,org.apache.avro.generic.GenericData$Array
http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-libraries/flink-cep/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/pom.xml b/flink-libraries/flink-cep/pom.xml
index bd57d17..a561cca 100644
--- a/flink-libraries/flink-cep/pom.xml
+++ b/flink-libraries/flink-cep/pom.xml
@@ -88,14 +88,6 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
-
- <!-- we include Avro to make the CEPMigrationTest work, it uses a Kryo-serialized savepoint (see FLINK-7420) -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-avro_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
index dc322c3..c333397 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
@@ -168,8 +168,8 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters {
DataInputViewStream dis = new DataInputViewStream(in);
ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
try (
- TypeSerializerSerializationUtil.SerialUIDMismatchTolerantInputStream ois =
- new TypeSerializerSerializationUtil.SerialUIDMismatchTolerantInputStream(dis, userCodeClassLoader)) {
+ TypeSerializerSerializationUtil.FailureTolerantObjectInputStream ois =
+ new TypeSerializerSerializationUtil.FailureTolerantObjectInputStream(dis, userCodeClassLoader)) {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
TypeSerializer<S> stateSerializer = (TypeSerializer<S>) ois.readObject();
http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java
index 77d2a1a..cbe9394 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/misc/KryoSerializerRegistrationsTest.java
@@ -33,6 +33,8 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -72,6 +74,15 @@ public class KryoSerializerRegistrationsTest {
if (registration == null) {
fail(String.format("Registration for %d = %s got lost", tag, registeredClass));
}
+ else if (registeredClass.equals("org.apache.avro.generic.GenericData$Array")) {
+ // starting with Flink 1.4 Avro is no longer a dependency of core. Avro is
+ // only available if flink-avro is present. There is a special version of
+ // this test in AvroKryoSerializerRegistrationsTest that verifies correct
+ // registration of Avro types if present
+ assertThat(
+ registration.getType().getName(),
+ is("org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroRegisteredClass"));
+ }
else if (!registeredClass.equals(registration.getType().getName())) {
fail(String.format("Registration for %d = %s changed to %s",
tag, registeredClass, registration.getType().getName()));
http://git-wip-us.apache.org/repos/asf/flink/blob/29249b2e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 773dc34..b93251b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -215,11 +215,11 @@ under the License.
</dependencies>
<!-- this section defines the module versions that are used if nothing else is specified. -->
-
+
<dependencyManagement>
- <!-- WARN:
- DO NOT put guava,
- protobuf,
+ <!-- WARN:
+ DO NOT put guava,
+ protobuf,
asm,
netty
here. It will overwrite Hadoop's guava dependency (even though we handle it
@@ -367,7 +367,7 @@ under the License.
<artifactId>joda-convert</artifactId>
<version>1.7</version>
</dependency>
-
+
<!-- kryo used in different versions by Flink an chill -->
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
@@ -579,7 +579,7 @@ under the License.
<outputDir>${project.build.directory}/spotbugs</outputDir>
<!-- A list of available stylesheets can be found here: https://github.com/findbugsproject/findbugs/tree/master/findbugs/src/xsl -->
<stylesheet>plain.xsl</stylesheet>
-
+
<fileMappers>
<fileMapper
implementation="org.codehaus.plexus.components.io.filemappers.FileExtensionMapper">
@@ -772,7 +772,7 @@ under the License.
</plugins>
</build>
</profile>
-
+
<profile>
<!--japicmp 0.7 does not support deactivation from the command
line, so we have to use a workaround with profiles instead.
@@ -842,7 +842,7 @@ under the License.
</dependency>
</dependencies>
</profile>
-
+
<profile>
<id>release</id>
<properties>
@@ -1027,6 +1027,7 @@ under the License.
<!-- Test Data. -->
<exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude>
+ <exclude>flink-formats/flink-avro/src/test/resources/flink_11-kryo_registrations</exclude>
<exclude>flink-runtime/src/test/resources/flink_11-kryo_registrations</exclude>
<exclude>flink-core/src/test/resources/kryo-serializer-config-snapshot-v1</exclude>
<exclude>flink-formats/flink-avro/src/test/resources/avro/*.avsc</exclude>
@@ -1287,7 +1288,7 @@ under the License.
</plugin>
</plugins>
- <!-- Plugin configurations for plugins activated in sub-projects -->
+ <!-- Plugin configurations for plugins activated in sub-projects -->
<pluginManagement>
<plugins>
@@ -1310,7 +1311,7 @@ under the License.
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
</plugin>
-
+
<!-- Disable certain plugins in Eclipse -->
<plugin>
<groupId>org.eclipse.m2e</groupId>