You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/13 05:17:22 UTC

[03/15] flink git commit: [FLINK-6869] [core] Tolerate serialVersionUID mismatches for Scala and anonymous serializers

[FLINK-6869] [core] Tolerate serialVersionUID mismatches for Scala and anonymous serializers

This commit lets the TypeSerializerSerializationProxy be tolerable for
serialVersionUID mismatches when reading anonymous classed serializers
or our Scala serializers.

Our Scala serializers require this since they use Scala macros to be
generated at compile time, and therefore is not possible to fix a
certain serialVersionUID for them. For non-generated Scala serializers,
we still also need this because their serialVersionUIDs pre-1.3 may
vary depending on the Scala version used.

This can be seen as a workaround, and should be reverted once 1.2
savepoint compatibility is no longer maintained.

This commit also updates the streaming state docs to educate the user to
avoid using anonymous classes for their state serializers.

This closes #4090.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b216a4a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b216a4a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b216a4a0

Branch: refs/heads/master
Commit: b216a4a0acf4e4d0463c3ed961d6a0258223491a
Parents: 75ea808
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sat Jun 10 22:41:35 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:37:01 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/state.md                        |   7 +
 .../TypeSerializerSerializationUtil.java        |  82 +++++++++-
 .../TypeSerializerSerializationUtilTest.java    | 158 ++++++++++++++++++-
 ...ckendStateMetaInfoSnapshotReaderWriters.java |  14 +-
 4 files changed, 254 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b216a4a0/docs/dev/stream/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md
index 97f0c29..0025fae 100644
--- a/docs/dev/stream/state.md
+++ b/docs/dev/stream/state.md
@@ -453,6 +453,13 @@ ListStateDescriptor<Tuple2<String, Integer>> descriptor =
 checkpointedState = getRuntimeContext().getListState(descriptor);
 {% endhighlight %}
 
+Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following
+subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using
+anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname,
+varying across compilers and depends on the order that they are instantiated within the enclosing class, which can 
+easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the
+classpath).
+
 ### Handling serializer upgrades and compatibility
 
 Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any

http://git-wip-us.apache.org/repos/asf/flink/blob/b216a4a0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
index 3d79d9a..058ef46 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
@@ -32,10 +32,16 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InvalidClassException;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Utility methods for serialization of {@link TypeSerializer} and {@link TypeSerializerConfigSnapshot}.
@@ -46,6 +52,67 @@ public class TypeSerializerSerializationUtil {
 	private static final Logger LOG = LoggerFactory.getLogger(TypeSerializerSerializationUtil.class);
 
 	/**
+	 * This is maintained as a temporary workaround for FLINK-6869.
+	 *
+	 * <p>Before 1.3, the Scala serializers did not specify the serialVersionUID.
+	 * Although since 1.3 they are properly specified, we still have to ignore them for now
+	 * as their previous serialVersionUIDs will vary depending on the Scala version.
+	 *
+	 * <p>This can be removed once 1.2 is no longer supported.
+	 */
+	private static Set<String> scalaSerializerClassnames = new HashSet<>();
+	static {
+		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer");
+		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer");
+		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");
+		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer");
+		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer");
+		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer");
+		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");
+		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer");
+	}
+
+	/**
+	 * An {@link ObjectInputStream} that ignores serialVersionUID mismatches when deserializing objects of
+	 * anonymous classes or our Scala serializer classes.
+	 *
+	 * <p>The {@link TypeSerializerSerializationProxy} uses this specific object input stream to read serializers,
+	 * so that mismatching serialVersionUIDs of anonymous classes / Scala serializers are ignored.
+	 * This is a required workaround to maintain backwards compatibility for our pre-1.3 Scala serializers.
+	 * See FLINK-6869 for details.
+	 *
+	 * @see <a href="https://issues.apache.org/jira/browse/FLINK-6869">FLINK-6869</a>
+	 */
+	public static class SerialUIDMismatchTolerantInputStream extends InstantiationUtil.ClassLoaderObjectInputStream {
+
+		public SerialUIDMismatchTolerantInputStream(InputStream in, ClassLoader cl) throws IOException {
+			super(in, cl);
+		}
+
+		@Override
+		protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFoundException {
+			ObjectStreamClass streamClassDescriptor = super.readClassDescriptor();
+
+			Class localClass = resolveClass(streamClassDescriptor);
+			if (scalaSerializerClassnames.contains(localClass.getName()) || localClass.isAnonymousClass()
+				// isAnonymousClass does not work for anonymous Scala classes; additionally check by classname
+				|| localClass.getName().contains("$anon$") || localClass.getName().contains("$anonfun")) {
+
+				ObjectStreamClass localClassDescriptor = ObjectStreamClass.lookup(localClass);
+				if (localClassDescriptor != null
+					&& localClassDescriptor.getSerialVersionUID() != streamClassDescriptor.getSerialVersionUID()) {
+					LOG.warn("Ignoring serialVersionUID mismatch for anonymous class {}; was {}, now {}.",
+						streamClassDescriptor.getName(), streamClassDescriptor.getSerialVersionUID(), localClassDescriptor.getSerialVersionUID());
+
+					streamClassDescriptor = localClassDescriptor;
+				}
+			}
+
+			return streamClassDescriptor;
+		}
+	}
+
+	/**
 	 * Writes a {@link TypeSerializer} to the provided data output view.
 	 *
 	 * <p>It is written with a format that can be later read again using
@@ -354,6 +421,7 @@ public class TypeSerializerSerializationUtil {
 			}
 		}
 
+		@SuppressWarnings("unchecked")
 		@Override
 		public void read(DataInputView in) throws IOException {
 			super.read(in);
@@ -362,8 +430,14 @@ public class TypeSerializerSerializationUtil {
 			int serializerBytes = in.readInt();
 			byte[] buffer = new byte[serializerBytes];
 			in.readFully(buffer);
-			try {
-				typeSerializer = InstantiationUtil.deserializeObject(buffer, userClassLoader);
+
+			ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
+			try (
+				SerialUIDMismatchTolerantInputStream ois =
+					new SerialUIDMismatchTolerantInputStream(new ByteArrayInputStream(buffer), userClassLoader)) {
+
+				Thread.currentThread().setContextClassLoader(userClassLoader);
+				typeSerializer = (TypeSerializer<T>) ois.readObject();
 			} catch (ClassNotFoundException | InvalidClassException e) {
 				if (useDummyPlaceholder) {
 					// we create a dummy so that all the information is not lost when we get a new checkpoint before receiving
@@ -372,8 +446,10 @@ public class TypeSerializerSerializationUtil {
 						new UnloadableDummyTypeSerializer<>(buffer);
 					LOG.warn("Could not find requested TypeSerializer class in classpath. Created dummy.", e);
 				} else {
-					throw new IOException("Missing class for type serializer.", e);
+					throw new IOException("Unloadable class for type serializer.", e);
 				}
+			} finally {
+				Thread.currentThread().setContextClassLoader(previousClassLoader);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b216a4a0/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
index 738644b..10df619 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtilTest.java
@@ -29,7 +29,9 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.InstantiationUtil;
 import org.junit.Assert;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -39,8 +41,11 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InvalidClassException;
+import java.io.ObjectStreamClass;
+import java.io.Serializable;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 
@@ -55,7 +60,10 @@ import static org.mockito.Mockito.mock;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(TypeSerializerSerializationUtil.class)
-public class TypeSerializerSerializationUtilTest {
+public class TypeSerializerSerializationUtilTest implements Serializable {
+
+	@ClassRule
+	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
 
 	/**
 	 * Verifies that reading and writing serializers work correctly.
@@ -236,6 +244,36 @@ public class TypeSerializerSerializationUtilTest {
 		Assert.assertEquals(DoubleSerializer.INSTANCE.snapshotConfiguration(), restored.get(1).f1);
 	}
 
+	/**
+	 * Verifies that serializers of anonymous classes can be deserialized, even if serialVersionUID changes.
+	 */
+	@Test
+	public void testAnonymousSerializerClassWithChangedSerialVersionUID() throws Exception {
+
+		TypeSerializer anonymousClassSerializer = new AbstractIntSerializer() {};
+		// assert that our assumption holds
+		Assert.assertTrue(anonymousClassSerializer.getClass().isAnonymousClass());
+
+		byte[] anonymousSerializerBytes;
+		try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+			TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), anonymousClassSerializer);
+			anonymousSerializerBytes = out.toByteArray();
+		}
+
+		long newSerialVersionUID = 1234567L;
+		// assert that we're actually modifying to a different serialVersionUID
+		Assert.assertNotEquals(ObjectStreamClass.lookup(anonymousClassSerializer.getClass()).getSerialVersionUID(), newSerialVersionUID);
+		modifySerialVersionUID(anonymousSerializerBytes, anonymousClassSerializer.getClass().getName(), newSerialVersionUID);
+
+		try (ByteArrayInputStream in = new ByteArrayInputStream(anonymousSerializerBytes)) {
+			anonymousClassSerializer = TypeSerializerSerializationUtil.tryReadSerializer(new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+		}
+
+		// serializer should have been deserialized despite serialVersionUID mismatch
+		Assert.assertNotNull(anonymousClassSerializer);
+		Assert.assertTrue(anonymousClassSerializer.getClass().isAnonymousClass());
+	}
+
 	public static class TestConfigSnapshot extends TypeSerializerConfigSnapshot {
 
 		static final int VERSION = 1;
@@ -292,4 +330,122 @@ public class TypeSerializerSerializationUtilTest {
 			return 31 * val + msg.hashCode();
 		}
 	}
+
+	private static void modifySerialVersionUID(byte[] objectBytes, String classname, long newSerialVersionUID) throws Exception {
+		byte[] classnameBytes = classname.getBytes();
+
+		// serialVersionUID follows directly after classname in the object byte stream;
+		// advance serialVersionUIDPosition until end of classname in stream
+		int serialVersionUIDOffset;
+		boolean foundClass = false;
+		int numMatchedBytes = 0;
+		for (serialVersionUIDOffset = 0; serialVersionUIDOffset < objectBytes.length; serialVersionUIDOffset++) {
+			if (objectBytes[serialVersionUIDOffset] == classnameBytes[numMatchedBytes]) {
+				numMatchedBytes++;
+				foundClass = true;
+			} else {
+				if (objectBytes[serialVersionUIDOffset] == classnameBytes[0]) {
+					numMatchedBytes = 1;
+				} else {
+					numMatchedBytes = 0;
+					foundClass = false;
+				}
+			}
+
+			if (numMatchedBytes == classnameBytes.length) {
+				break;
+			}
+		}
+
+		if (!foundClass) {
+			throw new RuntimeException("Could not find class " + classname + " in object byte stream.");
+		}
+
+		byte[] newUIDBytes = ByteBuffer.allocate(Long.SIZE / Byte.SIZE).putLong(newSerialVersionUID).array();
+
+		// replace original serialVersionUID bytes with new serialVersionUID bytes
+		for (int uidIndex = 0; uidIndex < newUIDBytes.length; uidIndex++) {
+			objectBytes[serialVersionUIDOffset + 1 + uidIndex] = newUIDBytes[uidIndex];
+		}
+	}
+
+	public static abstract class AbstractIntSerializer extends TypeSerializer<Integer> {
+
+		public static final long serialVersionUID = 1;
+
+		@Override
+		public Integer createInstance() {
+			return IntSerializer.INSTANCE.createInstance();
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return IntSerializer.INSTANCE.isImmutableType();
+		}
+
+		@Override
+		public Integer copy(Integer from) {
+			return IntSerializer.INSTANCE.copy(from);
+		}
+
+		@Override
+		public Integer copy(Integer from, Integer reuse) {
+			return IntSerializer.INSTANCE.copy(from, reuse);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			IntSerializer.INSTANCE.copy(source, target);
+		}
+
+		@Override
+		public Integer deserialize(DataInputView source) throws IOException {
+			return IntSerializer.INSTANCE.deserialize(source);
+		}
+
+		@Override
+		public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
+			return IntSerializer.INSTANCE.deserialize(reuse, source);
+		}
+
+		@Override
+		public void serialize(Integer record, DataOutputView target) throws IOException {
+			IntSerializer.INSTANCE.serialize(record, target);
+		}
+
+		@Override
+		public TypeSerializer<Integer> duplicate() {
+			return IntSerializer.INSTANCE.duplicate();
+		}
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			return IntSerializer.INSTANCE.snapshotConfiguration();
+		}
+
+		@Override
+		public CompatibilityResult<Integer> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			return IntSerializer.INSTANCE.ensureCompatibility(configSnapshot);
+		}
+
+		@Override
+		public int getLength() {
+			return IntSerializer.INSTANCE.getLength();
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return IntSerializer.INSTANCE.canEqual(obj);
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return IntSerializer.INSTANCE.equals(obj);
+		}
+
+		@Override
+		public int hashCode() {
+			return IntSerializer.INSTANCE.hashCode();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b216a4a0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
index e52323f..dc322c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -157,6 +156,7 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters {
 			super(userCodeClassLoader);
 		}
 
+		@SuppressWarnings("unchecked")
 		@Override
 		public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readStateMetaInfo(DataInputView in) throws IOException {
 			RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo =
@@ -164,12 +164,20 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters {
 
 			stateMetaInfo.setName(in.readUTF());
 			stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]);
+
 			DataInputViewStream dis = new DataInputViewStream(in);
-			try {
-				TypeSerializer<S> stateSerializer = InstantiationUtil.deserializeObject(dis, userCodeClassLoader);
+			ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
+			try (
+				TypeSerializerSerializationUtil.SerialUIDMismatchTolerantInputStream ois =
+					new TypeSerializerSerializationUtil.SerialUIDMismatchTolerantInputStream(dis, userCodeClassLoader)) {
+
+				Thread.currentThread().setContextClassLoader(userCodeClassLoader);
+				TypeSerializer<S> stateSerializer = (TypeSerializer<S>) ois.readObject();
 				stateMetaInfo.setPartitionStateSerializer(stateSerializer);
 			} catch (ClassNotFoundException exception) {
 				throw new IOException(exception);
+			} finally {
+				Thread.currentThread().setContextClassLoader(previousClassLoader);
 			}
 
 			// old versions do not contain the partition state serializer's configuration snapshot