You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/17 14:52:18 UTC

flink git commit: [FLINK-6600] Add key serializer config snapshot to keyed backend checkpoints

Repository: flink
Updated Branches:
  refs/heads/master 2bfead7d9 -> d8a467b01


[FLINK-6600] Add key serializer config snapshot to keyed backend checkpoints

This commit adds the config snapshot of the key serializer of keyed
backends to its checkpoints. This allows the oppurtunity to upgrade key
serializers, as well as state migration in the future in the case of
incompatible old and new key serializers.

This closes #3925.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8a467b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8a467b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8a467b0

Branch: refs/heads/master
Commit: d8a467b01ab63127dbf563b6aa8c68fe5d9c85d4
Parents: 2bfead7
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Wed May 17 01:15:57 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Wed May 17 22:46:51 2017 +0800

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 30 ++++++++++
 .../state/KeyedBackendSerializationProxy.java   | 61 ++++++++++++++++++--
 .../state/heap/HeapKeyedStateBackend.java       | 15 +++++
 .../runtime/state/SerializationProxiesTest.java | 54 ++++++++++++++++-
 .../runtime/state/StateBackendTestBase.java     | 42 ++++++++++++++
 5 files changed, 197 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d8a467b0/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 4bd94fd..ddc7e17 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1116,6 +1116,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 * @throws ClassNotFoundException
 		 * @throws RocksDBException
 		 */
+		@SuppressWarnings("unchecked")
 		private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException {
 
 			KeyedBackendSerializationProxy serializationProxy =
@@ -1123,6 +1124,20 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			serializationProxy.read(currentStateHandleInView);
 
+			// check for key serializer compatibility; this also reconfigures the
+			// key serializer to be compatible, if it is required and is possible
+			if (StateMigrationUtil.resolveCompatibilityResult(
+					serializationProxy.getKeySerializer(),
+					TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
+					serializationProxy.getKeySerializerConfigSnapshot(),
+					(TypeSerializer) rocksDBKeyedStateBackend.keySerializer)
+				.isRequiresMigration()) {
+
+				// TODO replace with state migration; note that key hash codes need to remain the same after migration
+				throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
+					"Aborting now since state migration is currently not available");
+			}
+
 			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
 					serializationProxy.getStateMetaInfoSnapshots();
 
@@ -1214,6 +1229,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			this.stateBackend = stateBackend;
 		}
 
+		@SuppressWarnings("unchecked")
 		private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData(
 				StreamStateHandle metaStateHandle) throws Exception {
 
@@ -1228,6 +1244,20 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				DataInputView in = new DataInputViewStreamWrapper(inputStream);
 				serializationProxy.read(in);
 
+				// check for key serializer compatibility; this also reconfigures the
+				// key serializer to be compatible, if it is required and is possible
+				if (StateMigrationUtil.resolveCompatibilityResult(
+						serializationProxy.getKeySerializer(),
+						TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
+						serializationProxy.getKeySerializerConfigSnapshot(),
+						(TypeSerializer) stateBackend.keySerializer)
+					.isRequiresMigration()) {
+
+					// TODO replace with state migration; note that key hash codes need to remain the same after migration
+					throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
+						"Aborting now since state migration is currently not available");
+				}
+
 				return serializationProxy.getStateMetaInfoSnapshots();
 			} finally {
 				if (inputStream != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d8a467b0/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index a20628c..94fb9f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -19,10 +19,16 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
 import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -38,6 +44,8 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 	public static final int VERSION = 3;
 
 	private TypeSerializer<?> keySerializer;
+	private TypeSerializerConfigSnapshot keySerializerConfigSnapshot;
+
 	private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
 
 	private ClassLoader userCodeClassLoader;
@@ -51,6 +59,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) {
 
 		this.keySerializer = Preconditions.checkNotNull(keySerializer);
+		this.keySerializerConfigSnapshot = Preconditions.checkNotNull(keySerializer.snapshotConfiguration());
 
 		Preconditions.checkNotNull(stateMetaInfoSnapshots);
 		Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= Short.MAX_VALUE);
@@ -65,6 +74,10 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 		return keySerializer;
 	}
 
+	public TypeSerializerConfigSnapshot getKeySerializerConfigSnapshot() {
+		return keySerializerConfigSnapshot;
+	}
+
 	@Override
 	public int getVersion() {
 		return VERSION;
@@ -80,10 +93,24 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 	public void write(DataOutputView out) throws IOException {
 		super.write(out);
 
-		new TypeSerializerSerializationProxy<>(keySerializer).write(out);
+		// write in a way to be fault tolerant of read failures when deserializing the key serializer
+		try (
+			ByteArrayOutputStreamWithPos buffer = new ByteArrayOutputStreamWithPos();
+			DataOutputViewStreamWrapper bufferWrapper = new DataOutputViewStreamWrapper(buffer)){
 
-		out.writeShort(stateMetaInfoSnapshots.size());
+			new TypeSerializerSerializationProxy<>(keySerializer).write(bufferWrapper);
+
+			// write offset of key serializer's configuration snapshot
+			out.writeInt(buffer.getPosition());
+			TypeSerializerUtil.writeSerializerConfigSnapshot(bufferWrapper, keySerializerConfigSnapshot);
 
+			// flush buffer
+			out.writeInt(buffer.getPosition());
+			out.write(buffer.getBuf(), 0, buffer.getPosition());
+		}
+
+		// write individual registered keyed state metainfos
+		out.writeShort(stateMetaInfoSnapshots.size());
 		for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> metaInfo : stateMetaInfoSnapshots) {
 			KeyedBackendStateMetaInfoSnapshotReaderWriters
 				.getWriterForVersion(VERSION, metaInfo)
@@ -97,8 +124,34 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 
 		final TypeSerializerSerializationProxy<?> keySerializerProxy =
 			new TypeSerializerSerializationProxy<>(userCodeClassLoader);
-		keySerializerProxy.read(in);
-		this.keySerializer = keySerializerProxy.getTypeSerializer();
+
+		// only starting from version 3, we have the key serializer and its config snapshot written
+		if (getReadVersion() >= 3) {
+			int keySerializerConfigSnapshotOffset = in.readInt();
+			int numBufferedBytes = in.readInt();
+			byte[] keySerializerAndConfigBytes = new byte[numBufferedBytes];
+			in.readFully(keySerializerAndConfigBytes);
+
+			try (
+				ByteArrayInputStreamWithPos buffer = new ByteArrayInputStreamWithPos(keySerializerAndConfigBytes);
+				DataInputViewStreamWrapper bufferWrapper = new DataInputViewStreamWrapper(buffer)) {
+
+				try {
+					keySerializerProxy.read(bufferWrapper);
+					this.keySerializer = keySerializerProxy.getTypeSerializer();
+				} catch (IOException e) {
+					this.keySerializer = null;
+				}
+
+				buffer.setPosition(keySerializerConfigSnapshotOffset);
+				this.keySerializerConfigSnapshot =
+					TypeSerializerUtil.readSerializerConfigSnapshot(bufferWrapper, userCodeClassLoader);
+			}
+		} else {
+			keySerializerProxy.read(in);
+			this.keySerializer = keySerializerProxy.getTypeSerializer();
+			this.keySerializerConfigSnapshot = null;
+		}
 
 		int numKvStates = in.readShort();
 		stateMetaInfoSnapshots = new ArrayList<>(numKvStates);

http://git-wip-us.apache.org/repos/asf/flink/blob/d8a467b0/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 11e7760..8d3d8a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -54,6 +54,7 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.StateMigrationUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -385,6 +386,20 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 				serializationProxy.read(inView);
 
+				// check for key serializer compatibility; this also reconfigures the
+				// key serializer to be compatible, if it is required and is possible
+				if (StateMigrationUtil.resolveCompatibilityResult(
+						serializationProxy.getKeySerializer(),
+						TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
+						serializationProxy.getKeySerializerConfigSnapshot(),
+						(TypeSerializer) keySerializer)
+					.isRequiresMigration()) {
+
+					// TODO replace with state migration; note that key hash codes need to remain the same after migration
+					throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
+						"Aborting now since state migration is currently not available");
+				}
+
 				List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
 						serializationProxy.getStateMetaInfoSnapshots();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d8a467b0/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 02b4d62..8bbbd5f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -44,7 +44,10 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({KeyedBackendStateMetaInfoSnapshotReaderWriters.class, OperatorBackendStateMetaInfoSnapshotReaderWriters.class})
+@PrepareForTest({
+	KeyedBackendSerializationProxy.class,
+	KeyedBackendStateMetaInfoSnapshotReaderWriters.class,
+	OperatorBackendStateMetaInfoSnapshotReaderWriters.class})
 public class SerializationProxiesTest {
 
 	@Test
@@ -80,10 +83,59 @@ public class SerializationProxiesTest {
 		}
 
 		Assert.assertEquals(keySerializer, serializationProxy.getKeySerializer());
+		Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot());
 		Assert.assertEquals(stateMetaInfoList, serializationProxy.getStateMetaInfoSnapshots());
 	}
 
 	@Test
+	public void testKeyedBackendSerializationProxyRoundtripWithSerializerSerializationFailures() throws Exception {
+
+		TypeSerializer<?> keySerializer = IntSerializer.INSTANCE;
+		TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
+		TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
+
+		List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoList = new ArrayList<>();
+
+		stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+			StateDescriptor.Type.VALUE, "a", namespaceSerializer, stateSerializer).snapshot());
+		stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+			StateDescriptor.Type.VALUE, "b", namespaceSerializer, stateSerializer).snapshot());
+		stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+			StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());
+
+		KeyedBackendSerializationProxy serializationProxy =
+			new KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList);
+
+		byte[] serialized;
+		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+			serializationProxy.write(new DataOutputViewStreamWrapper(out));
+			serialized = out.toByteArray();
+		}
+
+		serializationProxy =
+			new KeyedBackendSerializationProxy(Thread.currentThread().getContextClassLoader());
+
+		// mock failure when deserializing serializers
+		TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class);
+		doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+		PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+			serializationProxy.read(new DataInputViewStreamWrapper(in));
+		}
+
+		Assert.assertEquals(null, serializationProxy.getKeySerializer());
+		Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot());
+
+		for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> meta : serializationProxy.getStateMetaInfoSnapshots()) {
+			Assert.assertEquals(null, meta.getNamespaceSerializer());
+			Assert.assertEquals(null, meta.getStateSerializer());
+			Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), meta.getNamespaceSerializerConfigSnapshot());
+			Assert.assertEquals(stateSerializer.snapshotConfiguration(), meta.getStateSerializerConfigSnapshot());
+		}
+	}
+
+	@Test
 	public void testKeyedStateMetaInfoSerialization() throws Exception {
 
 		String name = "test";

http://git-wip-us.apache.org/repos/asf/flink/blob/d8a467b0/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index ca66ffb..b1927f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -38,6 +38,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
@@ -1805,6 +1806,47 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	}
 
 	@Test
+	public void testRestoreWithWrongKeySerializer() {
+		try {
+			CheckpointStreamFactory streamFactory = createStreamFactory();
+
+			// use an IntSerializer at first
+			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+			ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class);
+
+			ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+			// write some state
+			backend.setCurrentKey(1);
+			state.update("1");
+			backend.setCurrentKey(2);
+			state.update("2");
+
+			// draw a snapshot
+			KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint()));
+
+			backend.dispose();
+
+			// restore with the wrong key serializer
+			try {
+				restoreKeyedBackend(DoubleSerializer.INSTANCE, snapshot1);
+
+				fail("should recognize wrong key serializer");
+			} catch (RuntimeException e) {
+				if (!e.getMessage().contains("The new key serializer is not compatible")) {
+					fail("wrong exception " + e);
+				}
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
 	@SuppressWarnings("unchecked")
 	public void testValueStateRestoreWithWrongSerializers() {
 		try {