You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/14 16:58:00 UTC
[1/5] flink git commit: [FLINK-5051] Add Serde Proxies for
Serializers and State Backend Data
Repository: flink
Updated Branches:
refs/heads/master bf2874e22 -> 21d1d8b49
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index 96e23d6..797150a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
import org.apache.flink.runtime.state.KeyGroupRange;
import java.util.Arrays;
@@ -26,11 +27,8 @@ import java.util.Map;
public class StateTable<K, N, ST> {
- /** Serializer for the state value. The state value could be a List<V>, for example. */
- protected final TypeSerializer<ST> stateSerializer;
-
- /** The serializer for the namespace */
- protected final TypeSerializer<N> namespaceSerializer;
+ /** Combined meta information such as name and serializers for this state */
+ protected RegisteredBackendStateMetaInfo<N, ST> metaInfo;
/** Map for holding the actual state objects. */
private final List<Map<N, Map<K, ST>>> state;
@@ -38,11 +36,9 @@ public class StateTable<K, N, ST> {
protected final KeyGroupRange keyGroupRange;
public StateTable(
- TypeSerializer<ST> stateSerializer,
- TypeSerializer<N> namespaceSerializer,
+ RegisteredBackendStateMetaInfo<N, ST> metaInfo,
KeyGroupRange keyGroupRange) {
- this.stateSerializer = stateSerializer;
- this.namespaceSerializer = namespaceSerializer;
+ this.metaInfo = metaInfo;
this.keyGroupRange = keyGroupRange;
this.state = Arrays.asList((Map<N, Map<K, ST>>[]) new Map[keyGroupRange.getNumberOfKeyGroups()]);
@@ -64,11 +60,19 @@ public class StateTable<K, N, ST> {
}
public TypeSerializer<ST> getStateSerializer() {
- return stateSerializer;
+ return metaInfo.getStateSerializer();
}
public TypeSerializer<N> getNamespaceSerializer() {
- return namespaceSerializer;
+ return metaInfo.getNamespaceSerializer();
+ }
+
+ public RegisteredBackendStateMetaInfo<N, ST> getMetaInfo() {
+ return metaInfo;
+ }
+
+ public void setMetaInfo(RegisteredBackendStateMetaInfo<N, ST> metaInfo) {
+ this.metaInfo = metaInfo;
}
public List<Map<N, Map<K, ST>>> getState() {
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
index 4279635..2076c08 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.query.netty.KvStateServer;
import org.apache.flink.runtime.query.netty.UnknownKvStateID;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
@@ -266,13 +267,20 @@ public class QueryableStateClientTest {
serverStats[i] = new AtomicKvStateRequestStats();
servers[i] = new KvStateServer(InetAddress.getLocalHost(), 0, 1, 1, registries[i], serverStats[i]);
servers[i].start();
+ ValueStateDescriptor<Integer> descriptor =
+ new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null);
+ RegisteredBackendStateMetaInfo<VoidNamespace, Integer> registeredBackendStateMetaInfo = new RegisteredBackendStateMetaInfo<>(
+ descriptor.getType(),
+ descriptor.getName(),
+ VoidNamespaceSerializer.INSTANCE,
+ IntSerializer.INSTANCE);
// Register state
HeapValueState<Integer, VoidNamespace, Integer> kvState = new HeapValueState<>(
keyedStateBackend,
- new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null),
- new StateTable<Integer, VoidNamespace, Integer>(IntSerializer.INSTANCE, VoidNamespaceSerializer.INSTANCE, new KeyGroupRange(0, 1)),
+ descriptor,
+ new StateTable<Integer, VoidNamespace, Integer>(registeredBackendStateMetaInfo, new KeyGroupRange(0, 1)),
IntSerializer.INSTANCE,
VoidNamespaceSerializer.INSTANCE);
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 51e3739..648d762 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -134,7 +134,7 @@ public class OperatorStateBackendTest {
operatorStateBackend = abstractStateBackend.restoreOperatorStateBackend(
createMockEnvironment(), "testOperator", Collections.singletonList(stateHandle));
- assertEquals(0, operatorStateBackend.getRegisteredStateNames().size());
+ assertEquals(2, operatorStateBackend.getRegisteredStateNames().size());
listState1 = operatorStateBackend.getOperatorState(stateDescriptor1);
listState2 = operatorStateBackend.getOperatorState(stateDescriptor2);
@@ -155,7 +155,6 @@ public class OperatorStateBackendTest {
operatorStateBackend.dispose();
} finally {
-
stateHandle.discardState();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
new file mode 100644
index 0000000..9211e92
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SerializationProxiesTest {
+
+ @Test
+ public void testSerializationRoundtrip() throws Exception {
+
+ TypeSerializer<?> keySerializer = IntSerializer.INSTANCE;
+ TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
+ TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
+
+ List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoList = new ArrayList<>();
+
+ stateMetaInfoList.add(
+ new KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, "a", namespaceSerializer, stateSerializer));
+ stateMetaInfoList.add(
+ new KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, "b", namespaceSerializer, stateSerializer));
+ stateMetaInfoList.add(
+ new KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer));
+
+ KeyedBackendSerializationProxy serializationProxy =
+ new KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList);
+
+ byte[] serialized;
+ try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+ serializationProxy.write(new DataOutputViewStreamWrapper(out));
+ serialized = out.toByteArray();
+ }
+
+ serializationProxy =
+ new KeyedBackendSerializationProxy(Thread.currentThread().getContextClassLoader());
+
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ serializationProxy.read(new DataInputViewStreamWrapper(in));
+ }
+
+
+ Assert.assertEquals(keySerializer, serializationProxy.getKeySerializerProxy().getTypeSerializer());
+ Assert.assertEquals(stateMetaInfoList, serializationProxy.getNamedStateSerializationProxies());
+ }
+
+ @Test
+ public void testMetaInfoSerialization() throws Exception {
+
+ String name = "test";
+ TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
+ TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
+
+ KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfo =
+ new KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, name, namespaceSerializer, stateSerializer);
+
+ byte[] serialized;
+ try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+ metaInfo.write(new DataOutputViewStreamWrapper(out));
+ serialized = out.toByteArray();
+ }
+
+ metaInfo = new KeyedBackendSerializationProxy.StateMetaInfo<>(Thread.currentThread().getContextClassLoader());
+
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ metaInfo.read(new DataInputViewStreamWrapper(in));
+ }
+
+ Assert.assertEquals(name, metaInfo.getStateName());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 0a3a092..aad84df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -923,7 +923,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
fail("should recognize wrong serializers");
} catch (RuntimeException e) {
- if (!e.getMessage().contains("Trying to access state using wrong StateDescriptor")) {
+ if (!e.getMessage().contains("Trying to access state using wrong")) {
fail("wrong exception " + e);
}
// expected
@@ -974,7 +974,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
fail("should recognize wrong serializers");
} catch (RuntimeException e) {
- if (!e.getMessage().contains("Trying to access state using wrong StateDescriptor")) {
+ if (!e.getMessage().contains("Trying to access state using wrong")) {
fail("wrong exception " + e);
}
// expected
@@ -1027,7 +1027,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
fail("should recognize wrong serializers");
} catch (RuntimeException e) {
- if (!e.getMessage().contains("Trying to access state using wrong StateDescriptor")) {
+ if (!e.getMessage().contains("Trying to access state using wrong ")) {
fail("wrong exception " + e);
}
// expected
[5/5] flink git commit: [FLINK-5283] Fix closing streams when
restoring old savepoint in keyed backends
Posted by al...@apache.org.
[FLINK-5283] Fix closing streams when restoring old savepoint in keyed backends
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35f4ea78
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35f4ea78
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35f4ea78
Branch: refs/heads/master
Commit: 35f4ea787c55eceede5154fc1ff23c70cdc522b4
Parents: bf2874e
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Dec 7 21:23:35 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Dec 14 17:50:51 2016 +0100
----------------------------------------------------------------------
.../contrib/streaming/state/RocksDBKeyedStateBackend.java | 8 +++++---
.../flink/runtime/state/heap/HeapKeyedStateBackend.java | 6 ++++--
2 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/35f4ea78/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 8637f6b..5fef5e5 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1090,8 +1090,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
Preconditions.checkState(1 == restoreState.size(), "Only one element expected here.");
- HashMap<String, RocksDBStateBackend.FinalFullyAsyncSnapshot> namedStates =
- InstantiationUtil.deserializeObject(restoreState.iterator().next().openInputStream(), userCodeClassLoader);
+ HashMap<String, RocksDBStateBackend.FinalFullyAsyncSnapshot> namedStates;
+ try (FSDataInputStream inputStream = restoreState.iterator().next().openInputStream()) {
+ namedStates = InstantiationUtil.deserializeObject(inputStream, userCodeClassLoader);
+ }
Preconditions.checkState(1 == namedStates.size(), "Only one element expected here.");
DataInputView inputView = namedStates.values().iterator().next().stateHandle.getState(userCodeClassLoader);
@@ -1101,7 +1103,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// first get the column family mapping
int numColumns = inputView.readInt();
- Map<Byte, StateDescriptor> columnFamilyMapping = new HashMap<>(numColumns);
+ Map<Byte, StateDescriptor<?, ?>> columnFamilyMapping = new HashMap<>(numColumns);
for (int i = 0; i < numColumns; i++) {
byte mappingByte = inputView.readByte();
http://git-wip-us.apache.org/repos/asf/flink/blob/35f4ea78/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index aab2ee5..6e85b72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -378,8 +378,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
Preconditions.checkState(1 == stateHandles.size(), "Only one element expected here.");
- HashMap<String, KvStateSnapshot<K, ?, ?, ?>> namedStates =
- InstantiationUtil.deserializeObject(stateHandles.iterator().next().openInputStream(), userCodeClassLoader);
+ HashMap<String, KvStateSnapshot<K, ?, ?, ?>> namedStates;
+ try (FSDataInputStream inputStream = stateHandles.iterator().next().openInputStream()) {
+ namedStates = InstantiationUtil.deserializeObject(inputStream, userCodeClassLoader);
+ }
for (Map.Entry<String, KvStateSnapshot<K, ?, ?, ?>> nameToState : namedStates.entrySet()) {
[3/5] flink git commit: [FLINK-5282] Fix closing streams on exception
in SavepointV0Serializer
Posted by al...@apache.org.
[FLINK-5282] Fix closing streams on exception in SavepointV0Serializer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8cda6a22
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8cda6a22
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8cda6a22
Branch: refs/heads/master
Commit: 8cda6a2260bbbd8e84349f0204d2980cfdd5a48a
Parents: 35f4ea7
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Dec 7 21:25:29 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Dec 14 17:50:51 2016 +0100
----------------------------------------------------------------------
.../savepoint/SavepointV0Serializer.java | 20 +++++++++++++-------
1 file changed, 13 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8cda6a22/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
index e4125e5..dc307e2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.util.IOUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
@@ -286,16 +287,21 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
CheckpointStreamFactory.CheckpointStateOutputStream keyedStateOut =
checkpointStreamFactory.createCheckpointStateOutputStream(checkpointID, 0L);
- final long offset = keyedStateOut.getPos();
+ try {
+ final long offset = keyedStateOut.getPos();
- InstantiationUtil.serializeObject(keyedStateOut, oldKeyedState);
- StreamStateHandle streamStateHandle = keyedStateOut.closeAndGetHandle();
+ InstantiationUtil.serializeObject(keyedStateOut, oldKeyedState);
+ StreamStateHandle streamStateHandle = keyedStateOut.closeAndGetHandle();
+ keyedStateOut = null; // makes IOUtils.closeQuietly(...) ignore this
- if (null != streamStateHandle) {
- KeyGroupRangeOffsets keyGroupRangeOffsets =
- new KeyGroupRangeOffsets(parallelInstanceIdx, parallelInstanceIdx, new long[]{offset});
+ if (null != streamStateHandle) {
+ KeyGroupRangeOffsets keyGroupRangeOffsets =
+ new KeyGroupRangeOffsets(parallelInstanceIdx, parallelInstanceIdx, new long[]{offset});
- return new MigrationKeyGroupStateHandle(keyGroupRangeOffsets, streamStateHandle);
+ return new MigrationKeyGroupStateHandle(keyGroupRangeOffsets, streamStateHandle);
+ }
+ } finally {
+ IOUtils.closeQuietly(keyedStateOut);
}
}
return null;
[4/5] flink git commit: [FLINK-5041] Savepoint Backwards
Compatibility 1.1 -> 1.2
Posted by al...@apache.org.
[FLINK-5041] Savepoint Backwards Compatibility 1.1 -> 1.2
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e95fe56b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e95fe56b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e95fe56b
Branch: refs/heads/master
Commit: e95fe56b60158fc4df511a1404f7346bd18b8f12
Parents: 8cda6a2
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Nov 24 17:29:24 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Dec 14 17:50:51 2016 +0100
----------------------------------------------------------------------
.../streaming/state/RocksDBFoldingState.java | 6 ++--
.../org/apache/flink/core/io/Versioned.java | 31 ++++++++++++++++++
.../java/org/apache/flink/util/Migration.java | 25 ++++++++++++++
.../savepoint/SavepointV0Serializer.java | 7 ++--
.../runtime/state/AbstractStateBackend.java | 6 +++-
.../state/MigrationKeyGroupStateHandle.java | 3 +-
.../state/MigrationStreamStateHandle.java | 13 ++++++--
.../runtime/checkpoint/savepoint/Savepoint.java | 13 ++------
.../savepoint/SavepointV1Serializer.java | 16 ++++-----
.../savepoint/MigrationV0ToV1Test.java | 13 ++------
.../runtime/state/OperatorStateBackendTest.java | 2 +-
.../api/operators/AbstractStreamOperator.java | 34 +++++++++++---------
.../operators/AbstractUdfStreamOperator.java | 8 +++++
13 files changed, 122 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
index 3018f7b..9c2bf4f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java
@@ -22,13 +22,13 @@ import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
/**
@@ -85,7 +85,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
if (valueBytes == null) {
return null;
}
- return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+ return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
} catch (IOException|RocksDBException e) {
throw new RuntimeException("Error while retrieving data from RocksDB", e);
}
@@ -103,7 +103,7 @@ public class RocksDBFoldingState<K, N, T, ACC>
valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out);
backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
} else {
- ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
+ ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
ACC newValue = foldFunction.fold(oldValue, value);
keySerializationStream.reset();
valueSerializer.serialize(newValue, out);
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-core/src/main/java/org/apache/flink/core/io/Versioned.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/Versioned.java b/flink-core/src/main/java/org/apache/flink/core/io/Versioned.java
new file mode 100644
index 0000000..b36d5e8
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/io/Versioned.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+/**
+ * This interface is implemented by classes that provide a version number. Versions numbers can be used to differentiate
+ * between evolving classes.
+ */
+public interface Versioned {
+
+ /**
+ * Returns the version number of the object. Versions numbers can be used to differentiate evolving classes.
+ */
+ int getVersion();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-core/src/main/java/org/apache/flink/util/Migration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/Migration.java b/flink-core/src/main/java/org/apache/flink/util/Migration.java
new file mode 100644
index 0000000..4bd9e39
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/Migration.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+/**
+ * Tagging interface for migration related classes.
+ */
+public interface Migration {
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
index dc307e2..6c6a8f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.migration.runtime.checkpoint.KeyGroupState;
import org.apache.flink.migration.runtime.checkpoint.SubtaskState;
import org.apache.flink.migration.runtime.checkpoint.TaskState;
+import org.apache.flink.migration.runtime.state.AbstractStateBackend;
import org.apache.flink.migration.runtime.state.KvStateSnapshot;
import org.apache.flink.migration.runtime.state.StateHandle;
import org.apache.flink.migration.runtime.state.filesystem.AbstractFileStateHandle;
@@ -266,10 +267,7 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
}
if (null != operatorState) {
- mergeStateHandles.add(SIGNAL_1);
mergeStateHandles.add(convertStateHandle(operatorState));
- } else {
- mergeStateHandles.add(SIGNAL_0);
}
return new MigrationStreamStateHandle(new MultiStreamStateHandle(mergeStateHandles));
@@ -340,6 +338,9 @@ public class SavepointV0Serializer implements SavepointSerializer<SavepointV1> {
byte[] data =
((org.apache.flink.migration.runtime.state.memory.ByteStreamStateHandle) oldStateHandle).getData();
return new ByteStreamStateHandle(String.valueOf(System.identityHashCode(data)), data);
+ } else if (oldStateHandle instanceof AbstractStateBackend.DataInputViewHandle) {
+ return convertStateHandle(
+ ((AbstractStateBackend.DataInputViewHandle) oldStateHandle).getStreamStateHandle());
}
throw new IllegalArgumentException("Unknown state handle type: " + oldStateHandle);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java
index b7932f5..1d76c06 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/state/AbstractStateBackend.java
@@ -35,7 +35,7 @@ public abstract class AbstractStateBackend implements Serializable {
/**
* Simple state handle that resolved a {@link DataInputView} from a StreamStateHandle.
*/
- private static final class DataInputViewHandle implements StateHandle<DataInputView> {
+ public static final class DataInputViewHandle implements StateHandle<DataInputView> {
private static final long serialVersionUID = 2891559813513532079L;
@@ -45,6 +45,10 @@ public abstract class AbstractStateBackend implements Serializable {
this.stream = stream;
}
+ public StreamStateHandle getStreamStateHandle() {
+ return stream;
+ }
+
@Override
public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
index 1bebcb6..995d234 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationKeyGroupStateHandle.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Migration;
@Internal
@Deprecated
@@ -29,7 +30,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
* This class is just a KeyGroupsStateHandle that is tagged as migration, to figure out which restore logic to apply,
* e.g. when restoring backend data from a state handle.
*/
-public class MigrationKeyGroupStateHandle extends KeyGroupsStateHandle {
+public class MigrationKeyGroupStateHandle extends KeyGroupsStateHandle implements Migration {
private static final long serialVersionUID = -8554427169776881697L;
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
index e2da757..e7831a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/state/MigrationStreamStateHandle.java
@@ -20,7 +20,9 @@ package org.apache.flink.migration.state;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataInputStreamWrapper;
import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Migration;
import java.io.IOException;
@@ -30,7 +32,7 @@ import java.io.IOException;
* This class is just a StreamStateHandle that is tagged as migration, to figure out which restore logic to apply, e.g.
* when restoring backend data from a state handle.
*/
-public class MigrationStreamStateHandle implements StreamStateHandle {
+public class MigrationStreamStateHandle implements StreamStateHandle, Migration {
private static final long serialVersionUID = -2332113722532150112L;
private final StreamStateHandle delegate;
@@ -41,7 +43,7 @@ public class MigrationStreamStateHandle implements StreamStateHandle {
@Override
public FSDataInputStream openInputStream() throws IOException {
- return delegate.openInputStream();
+ return new MigrationFSInputStream(delegate.openInputStream());
}
@Override
@@ -53,4 +55,11 @@ public class MigrationStreamStateHandle implements StreamStateHandle {
public long getStateSize() {
return delegate.getStateSize();
}
+
+ static class MigrationFSInputStream extends FSDataInputStreamWrapper implements Migration {
+
+ public MigrationFSInputStream(FSDataInputStream inputStream) {
+ super(inputStream);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
index 643f14c..baad05f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint.savepoint;
+import org.apache.flink.core.io.Versioned;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.TaskState;
@@ -34,17 +35,7 @@ import java.util.Collection;
* <p>Savepoints are serialized via a {@link SavepointSerializer} and stored
* via a {@link SavepointStore}.
*/
-public interface Savepoint {
-
- /**
- * Returns the savepoint version.
- *
- * <p>This version is independent of the Flink version, e.g. multiple Flink
- * versions can work the same savepoint version.
- *
- * @return Savepoint version
- */
- int getVersion();
+public interface Savepoint extends Versioned {
/**
* Returns the checkpoint ID of the savepoint.
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index cd3e87f..4d16c13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -211,7 +211,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
duration);
}
- public static void serializeKeyGroupStateHandle(
+ private static void serializeKeyGroupStateHandle(
KeyGroupsStateHandle stateHandle, DataOutputStream dos) throws IOException {
if (stateHandle != null) {
@@ -227,7 +227,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
}
}
- public static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
+ private static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
final int type = dis.readByte();
if (NULL_HANDLE == type) {
return null;
@@ -247,7 +247,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
}
}
- public static void serializeOperatorStateHandle(
+ private static void serializeOperatorStateHandle(
OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
if (stateHandle != null) {
@@ -258,8 +258,8 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
dos.writeUTF(entry.getKey());
long[] offsets = entry.getValue();
dos.writeInt(offsets.length);
- for (int i = 0; i < offsets.length; ++i) {
- dos.writeLong(offsets[i]);
+ for (long offset : offsets) {
+ dos.writeLong(offset);
}
}
serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
@@ -268,7 +268,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
}
}
- public static OperatorStateHandle deserializeOperatorStateHandle(
+ private static OperatorStateHandle deserializeOperatorStateHandle(
DataInputStream dis) throws IOException {
final int type = dis.readByte();
@@ -292,7 +292,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
}
}
- public static void serializeStreamStateHandle(
+ private static void serializeStreamStateHandle(
StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
if (stateHandle == null) {
@@ -319,7 +319,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
dos.flush();
}
- public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+ private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
int type = dis.read();
if (NULL_HANDLE == type) {
return null;
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
index 02365c7..4208fe5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java
@@ -147,13 +147,8 @@ public class MigrationV0ToV1Test {
//check operator state
expTestState.f0 = 1;
- if (p % 3 != 0) {
- assertEquals(1, is.read());
- actTestState = InstantiationUtil.deserializeObject(is, cl);
- assertEquals(expTestState, actTestState);
- } else {
- assertEquals(0, is.read());
- }
+ actTestState = InstantiationUtil.deserializeObject(is, cl);
+ assertEquals(expTestState, actTestState);
}
}
@@ -210,9 +205,7 @@ public class MigrationV0ToV1Test {
state.setFunctionState(new SerializedStateHandle<Serializable>(testState));
}
testState = new Tuple4<>(1, i, j, k);
- if (j % 3 != 0) {
- state.setOperatorState(new SerializedStateHandle<>(testState));
- }
+ state.setOperatorState(new SerializedStateHandle<>(testState));
if ((0 == k) && (i % 3 != 0)) {
HashMap<String, KvStateSnapshot<?, ?, ?, ?>> testKeyedState = new HashMap<>(2);
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 2db8735..51e3739 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -160,4 +160,4 @@ public class OperatorStateBackendTest {
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 839abf8..f9b711e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -28,11 +28,11 @@ import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
@@ -58,10 +58,10 @@ import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -201,7 +201,6 @@ public abstract class AbstractStreamOperator<OUT>
if (restoring) {
- // TODO check that there is EITHER old OR new state in handles!
restoreStreamCheckpointed(stateHandles);
//pass directly
@@ -230,18 +229,23 @@ public abstract class AbstractStreamOperator<OUT>
@Deprecated
private void restoreStreamCheckpointed(OperatorStateHandles stateHandles) throws Exception {
StreamStateHandle state = stateHandles.getLegacyOperatorState();
- if (this instanceof StreamCheckpointedOperator && null != state) {
+ if (null != state) {
+ if (this instanceof StreamCheckpointedOperator) {
- LOG.debug("Restore state of task {} in chain ({}).",
- stateHandles.getOperatorChainIndex(), getContainingTask().getName());
+ LOG.debug("Restore state of task {} in chain ({}).",
+ stateHandles.getOperatorChainIndex(), getContainingTask().getName());
- FSDataInputStream is = state.openInputStream();
- try {
- getContainingTask().getCancelables().registerClosable(is);
- ((StreamCheckpointedOperator) this).restoreState(is);
- } finally {
- getContainingTask().getCancelables().unregisterClosable(is);
- is.close();
+ FSDataInputStream is = state.openInputStream();
+ try {
+ getContainingTask().getCancelables().registerClosable(is);
+ ((StreamCheckpointedOperator) this).restoreState(is);
+ } finally {
+ getContainingTask().getCancelables().unregisterClosable(is);
+ is.close();
+ }
+ } else {
+ throw new Exception(
+ "Found legacy operator state for operator that does not implement StreamCheckpointedOperator.");
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e95fe56b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index c1f783f..1404958 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -39,6 +39,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Migration;
import java.io.Serializable;
import java.util.ArrayList;
@@ -192,6 +193,8 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
} catch (Exception e) {
throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
}
+ } else if (userFunction instanceof CheckpointedRestoring) {
+ out.write(0);
}
}
@@ -213,6 +216,11 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
}
}
}
+ } else if (in instanceof Migration) {
+ int hasUdfState = in.read();
+ if (hasUdfState == 1) {
+ throw new Exception("Found UDF state but operator is not instance of CheckpointedRestoring");
+ }
}
}
[2/5] flink git commit: [FLINK-5051] Add Serde Proxies for
Serializers and State Backend Data
Posted by al...@apache.org.
[FLINK-5051] Add Serde Proxies for Serializers and State Backend Data
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21d1d8b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21d1d8b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21d1d8b4
Branch: refs/heads/master
Commit: 21d1d8b49337e734ce3defb5f1b9344f748cb49e
Parents: e95fe56
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Nov 24 17:29:24 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Dec 14 17:50:51 2016 +0100
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 108 +++++----
.../common/state/FoldingStateDescriptor.java | 5 +
.../api/common/state/ListStateDescriptor.java | 5 +
.../common/state/ReducingStateDescriptor.java | 5 +
.../flink/api/common/state/StateDescriptor.java | 7 +
.../api/common/state/ValueStateDescriptor.java | 5 +
.../api/common/typeutils/TypeSerializer.java | 10 +-
.../TypeSerializerSerializationProxy.java | 231 +++++++++++++++++++
.../flink/core/io/VersionMismatchException.java | 44 ++++
.../core/io/VersionedIOReadableWritable.java | 70 ++++++
.../memory/ByteArrayInputStreamWithPos.java | 4 +
.../memory/ByteArrayOutputStreamWithPos.java | 4 +
.../TypeSerializerSerializationProxyTest.java | 94 ++++++++
.../flink/core/io/VersionedIOWriteableTest.java | 148 ++++++++++++
.../state/DefaultOperatorStateBackend.java | 195 +++++++++++-----
.../flink/runtime/state/JavaSerializer.java | 16 +-
.../state/KeyedBackendSerializationProxy.java | 215 +++++++++++++++++
.../OperatorBackendSerializationProxy.java | 140 +++++++++++
.../state/RegisteredBackendStateMetaInfo.java | 132 +++++++++++
.../state/heap/HeapKeyedStateBackend.java | 125 ++++++----
.../flink/runtime/state/heap/StateTable.java | 26 ++-
.../runtime/query/QueryableStateClientTest.java | 12 +-
.../runtime/state/OperatorStateBackendTest.java | 3 +-
.../runtime/state/SerializationProxiesTest.java | 99 ++++++++
.../runtime/state/StateBackendTestBase.java | 6 +-
25 files changed, 1534 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 5fef5e5..1c0a4b7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -50,6 +50,8 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.util.InstantiationUtil;
@@ -123,7 +125,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* Information about the k/v states as we create them. This is used to retrieve the
* column family that is used for a state and also for sanity checks when restoring.
*/
- private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>> kvStateInformation;
+ private Map<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> kvStateInformation;
/** Number of bytes required to prefix the key groups. */
private final int keyGroupPrefixBytes;
@@ -237,7 +239,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// and access it in a synchronized block that locks on #dbDisposeLock.
if (db != null) {
- for (Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> column : kvStateInformation.values()) {
+ for (Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> column :
+ kvStateInformation.values()) {
+
column.f0.close();
}
@@ -492,18 +496,24 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
private void writeKVStateMetaData() throws IOException, InterruptedException {
- //write number of k/v states
- outputView.writeInt(stateBackend.kvStateInformation.size());
+
+ List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoList =
+ new ArrayList<>(stateBackend.kvStateInformation.size());
int kvStateId = 0;
- //iterate all column families, where each column family holds one k/v state, to write the metadata
- for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>> column : stateBackend.kvStateInformation.entrySet()) {
+ for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> column :
+ stateBackend.kvStateInformation.entrySet()) {
+
+ RegisteredBackendStateMetaInfo<?, ?> metaInfo = column.getValue().f1;
- //be cooperative and check for interruption from time to time in the hot loop
- checkInterrupted();
+ KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy =
+ new KeyedBackendSerializationProxy.StateMetaInfo<>(
+ metaInfo.getStateType(),
+ metaInfo.getName(),
+ metaInfo.getNamespaceSerializer(),
+ metaInfo.getStateSerializer());
- //write StateDescriptor for this k/v state
- InstantiationUtil.serializeObject(outStream, column.getValue().f1);
+ metaInfoList.add(metaInfoProxy);
//retrieve iterator for this k/v states
readOptions = new ReadOptions();
@@ -512,6 +522,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
kvStateIterators.add(new Tuple2<>(iterator, kvStateId));
++kvStateId;
}
+
+ KeyedBackendSerializationProxy serializationProxy =
+ new KeyedBackendSerializationProxy(stateBackend.getKeySerializer(), metaInfoList);
+
+ serializationProxy.write(outputView);
}
private void writeKVStateData() throws IOException, InterruptedException {
@@ -691,30 +706,35 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* @throws RocksDBException
*/
private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException {
- //read number of k/v states
- int numColumns = currentStateHandleInView.readInt();
- //those two lists are aligned and should later have the same size!
- currentStateHandleKVStateColumnFamilies = new ArrayList<>(numColumns);
+ KeyedBackendSerializationProxy serializationProxy =
+ new KeyedBackendSerializationProxy(rocksDBKeyedStateBackend.userCodeClassLoader);
- //restore the empty columns for the k/v states through the metadata
- for (int i = 0; i < numColumns; i++) {
+ serializationProxy.read(currentStateHandleInView);
- StateDescriptor<?, ?> stateDescriptor = InstantiationUtil.deserializeObject(
- currentStateHandleInStream,
- rocksDBKeyedStateBackend.userCodeClassLoader);
+ List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoProxyList =
+ serializationProxy.getNamedStateSerializationProxies();
- Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> columnFamily = rocksDBKeyedStateBackend.
- kvStateInformation.get(stateDescriptor.getName());
+ currentStateHandleKVStateColumnFamilies = new ArrayList<>(metaInfoProxyList.size());
+
+ for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy : metaInfoProxyList) {
+ Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> columnFamily =
+ rocksDBKeyedStateBackend.kvStateInformation.get(metaInfoProxy.getStateName());
if (null == columnFamily) {
ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
- stateDescriptor.getName().getBytes(), rocksDBKeyedStateBackend.columnOptions);
+ metaInfoProxy.getStateName().getBytes(), rocksDBKeyedStateBackend.columnOptions);
+
+ RegisteredBackendStateMetaInfo<?, ?> stateMetaInfo =
+ new RegisteredBackendStateMetaInfo<>(metaInfoProxy);
- columnFamily = new Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>(
- rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor), stateDescriptor);
+ columnFamily = new Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>(
+ rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor),
+ stateMetaInfo);
- rocksDBKeyedStateBackend.kvStateInformation.put(stateDescriptor.getName(), columnFamily);
+ rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), columnFamily);
+ } else {
+ //TODO we could check here for incompatible serializer versions between previous tasks
}
currentStateHandleKVStateColumnFamilies.add(columnFamily.f0);
@@ -776,15 +796,24 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* <p>This also checks whether the {@link StateDescriptor} for a state matches the one
* that we checkpointed, i.e. is already in the map of column families.
*/
- protected ColumnFamilyHandle getColumnFamily(StateDescriptor<?, ?> descriptor) {
+ @SuppressWarnings("rawtypes, unchecked")
+ protected <N, S> ColumnFamilyHandle getColumnFamily(StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) {
+
+ Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> stateInfo =
+ kvStateInformation.get(descriptor.getName());
- Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> stateInfo = kvStateInformation.get(descriptor.getName());
+ RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredBackendStateMetaInfo<>(
+ descriptor.getType(),
+ descriptor.getName(),
+ namespaceSerializer,
+ descriptor.getSerializer());
if (stateInfo != null) {
- if (!stateInfo.f1.equals(descriptor)) {
- throw new RuntimeException("Trying to access state using wrong StateDescriptor, was " + stateInfo.f1 +
- " trying access with " + descriptor);
+ if (!newMetaInfo.isCompatibleWith(stateInfo.f1)) {
+ throw new RuntimeException("Trying to access state using wrong meta info, was " + stateInfo.f1 +
+ " trying access with " + newMetaInfo);
}
+ stateInfo.f1 = newMetaInfo;
return stateInfo.f0;
}
@@ -793,9 +822,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
try {
ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor);
- Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> tuple =
- new Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>(columnFamily, descriptor);
- kvStateInformation.put(descriptor.getName(), tuple);
+ Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<N, S>> tuple =
+ new Tuple2<>(columnFamily, newMetaInfo);
+ Map rawAccess = kvStateInformation;
+ rawAccess.put(descriptor.getName(), tuple);
return columnFamily;
} catch (RocksDBException e) {
throw new RuntimeException("Error creating ColumnFamilyHandle.", e);
@@ -806,7 +836,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<T> stateDesc) throws Exception {
- ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
return new RocksDBValueState<>(columnFamily, namespaceSerializer, stateDesc, this);
}
@@ -815,7 +845,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<T> stateDesc) throws Exception {
- ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
return new RocksDBListState<>(columnFamily, namespaceSerializer, stateDesc, this);
}
@@ -824,7 +854,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<T> stateDesc) throws Exception {
- ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
return new RocksDBReducingState<>(columnFamily, namespaceSerializer, stateDesc, this);
}
@@ -833,7 +863,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
- ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this);
}
@@ -1116,7 +1146,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
columnFamilyMapping.put(mappingByte, stateDescriptor);
// this will fill in the k/v state information
- getColumnFamily(stateDescriptor);
+ getColumnFamily(stateDescriptor, null);
}
// try and read until EOF
@@ -1124,7 +1154,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// the EOFException will get us out of this...
while (true) {
byte mappingByte = inputView.readByte();
- ColumnFamilyHandle handle = getColumnFamily(columnFamilyMapping.get(mappingByte));
+ ColumnFamilyHandle handle = getColumnFamily(columnFamilyMapping.get(mappingByte), null);
byte[] keyAndNamespace = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
ByteArrayInputStreamWithPos bis = new ByteArrayInputStreamWithPos(keyAndNamespace);
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index 32fa9f3..143945e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -138,4 +138,9 @@ public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState
", foldFunction=" + foldFunction +
'}';
}
+
+ @Override
+ public Type getType() {
+ return Type.FOLDING;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index 077109c..6861a07 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -101,4 +101,9 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
"serializer=" + serializer +
'}';
}
+
+ @Override
+ public Type getType() {
+ return Type.LIST;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
index 8d79da4..a1d4225 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
@@ -126,4 +126,9 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>
", reduceFunction=" + reduceFunction +
'}';
}
+
+ @Override
+ public Type getType() {
+ return Type.REDUCING;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 483c954..de3cd4e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -48,6 +48,11 @@ import static java.util.Objects.requireNonNull;
*/
@PublicEvolving
public abstract class StateDescriptor<S extends State, T> implements Serializable {
+
+ public enum Type {
+ VALUE, LIST, REDUCING, FOLDING, @Deprecated UNKNOWN
+ }
+
private static final long serialVersionUID = 1L;
/** Name that uniquely identifies state created from this StateDescriptor. */
@@ -267,6 +272,8 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
'}';
}
+ public abstract Type getType();
+
// ------------------------------------------------------------------------
// Serialization
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
index 10bcd58..7db9116 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -110,4 +110,9 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
", serializer=" + serializer +
'}';
}
+
+ @Override
+ public Type getType() {
+ return Type.VALUE;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 5e81db7..ac7fbc8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -18,13 +18,13 @@
package org.apache.flink.api.common.typeutils;
-import java.io.IOException;
-import java.io.Serializable;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import java.io.IOException;
+import java.io.Serializable;
+
/**
* This interface describes the methods that are required for a data type to be handled by the Flink
* runtime. Specifically, this interface contains the serialization and copying methods.
@@ -160,4 +160,8 @@ public abstract class TypeSerializer<T> implements Serializable {
public abstract boolean canEqual(Object obj);
public abstract int hashCode();
+
+ public boolean isCompatibleWith(TypeSerializer<?> other) {
+ return equals(other);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
new file mode 100644
index 0000000..06ad8bf
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+public class TypeSerializerSerializationProxy<T> extends VersionedIOReadableWritable {
+
+ public static final int VERSION = 1;
+ private static final Logger LOG = LoggerFactory.getLogger(TypeSerializerSerializationProxy.class);
+
+ private ClassLoader userClassLoader;
+
+ private TypeSerializer<T> typeSerializer;
+
+ private boolean ignoreClassNotFound;
+
+ public TypeSerializerSerializationProxy(ClassLoader userClassLoader, boolean ignoreClassNotFound) {
+ this.userClassLoader = userClassLoader;
+ this.ignoreClassNotFound = ignoreClassNotFound;
+ }
+
+ public TypeSerializerSerializationProxy(ClassLoader userClassLoader) {
+ this(userClassLoader, false);
+ }
+
+ public TypeSerializerSerializationProxy(TypeSerializer<T> typeSerializer) {
+ this.typeSerializer = Preconditions.checkNotNull(typeSerializer);
+ this.ignoreClassNotFound = false;
+ }
+
+ public TypeSerializer<T> getTypeSerializer() {
+ return typeSerializer;
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ super.write(out);
+
+ if (typeSerializer instanceof ClassNotFoundDummyTypeSerializer) {
+ ClassNotFoundDummyTypeSerializer<T> dummyTypeSerializer =
+ (ClassNotFoundDummyTypeSerializer<T>) this.typeSerializer;
+
+ byte[] serializerBytes = dummyTypeSerializer.getActualBytes();
+ out.write(serializerBytes.length);
+ out.write(serializerBytes);
+ } else {
+ // write in a way that allows the stream to recover from exceptions
+ try (ByteArrayOutputStreamWithPos streamWithPos = new ByteArrayOutputStreamWithPos()) {
+ InstantiationUtil.serializeObject(streamWithPos, typeSerializer);
+ out.writeInt(streamWithPos.getPosition());
+ out.write(streamWithPos.getBuf(), 0, streamWithPos.getPosition());
+ }
+ }
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ super.read(in);
+
+ // read in a way that allows the stream to recover from exceptions
+ int serializerBytes = in.readInt();
+ byte[] buffer = new byte[serializerBytes];
+ in.read(buffer);
+ try {
+ typeSerializer = InstantiationUtil.deserializeObject(buffer, userClassLoader);
+ } catch (ClassNotFoundException e) {
+ if (ignoreClassNotFound) {
+ // we create a dummy so that all the information is not lost when we get a new checkpoint before receiving
+ // a proper typeserializer from the user
+ typeSerializer =
+ new ClassNotFoundDummyTypeSerializer<>(buffer);
+ LOG.warn("Could not find requested TypeSerializer class in classpath. Created dummy.", e);
+ } else {
+ throw new IOException("Missing class for type serializer.", e);
+ }
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TypeSerializerSerializationProxy<?> that = (TypeSerializerSerializationProxy<?>) o;
+
+ return getTypeSerializer() != null ? getTypeSerializer().equals(that.getTypeSerializer()) : that.getTypeSerializer() == null;
+ }
+
+ @Override
+ public int hashCode() {
+ return getTypeSerializer() != null ? getTypeSerializer().hashCode() : 0;
+ }
+
+ public boolean isIgnoreClassNotFound() {
+ return ignoreClassNotFound;
+ }
+
+ public void setIgnoreClassNotFound(boolean ignoreClassNotFound) {
+ this.ignoreClassNotFound = ignoreClassNotFound;
+ }
+
+ /**
+ * Dummy TypeSerializer to avoid that data is lost when checkpointing again a serializer for which we encountered
+ * a {@link ClassNotFoundException}.
+ */
+ static final class ClassNotFoundDummyTypeSerializer<T> extends TypeSerializer<T> {
+
+ private static final long serialVersionUID = 2526330533671642711L;
+ private final byte[] actualBytes;
+
+ public ClassNotFoundDummyTypeSerializer(byte[] actualBytes) {
+ this.actualBytes = Preconditions.checkNotNull(actualBytes);
+ }
+
+ public byte[] getActualBytes() {
+ return actualBytes;
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public TypeSerializer<T> duplicate() {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public T createInstance() {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public T copy(T from) {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public T copy(T from, T reuse) {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public int getLength() {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public void serialize(T record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public T deserialize(DataInputView source) throws IOException {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public T deserialize(T reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return false;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ClassNotFoundDummyTypeSerializer<?> that = (ClassNotFoundDummyTypeSerializer<?>) o;
+
+ return Arrays.equals(getActualBytes(), that.getActualBytes());
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(getActualBytes());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/core/io/VersionMismatchException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/VersionMismatchException.java b/flink-core/src/main/java/org/apache/flink/core/io/VersionMismatchException.java
new file mode 100644
index 0000000..3ff88e9
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/io/VersionMismatchException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import java.io.IOException;
+
+/**
+ * This exception signals that incompatible versions have been found during serialization.
+ */
+public class VersionMismatchException extends IOException {
+
+ private static final long serialVersionUID = 7024258967585372438L;
+
+ public VersionMismatchException() {
+ }
+
+ public VersionMismatchException(String message) {
+ super(message);
+ }
+
+ public VersionMismatchException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public VersionMismatchException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java b/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
new file mode 100644
index 0000000..94c2722
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * This is the abstract base class for {@link IOReadableWritable} which allows to differentiate between serialization
+ * versions. Concrete subclasses should typically override the {@link #write(DataOutputView)} and
+ * {@link #read(DataInputView)}, thereby calling super to ensure version checking.
+ */
+public abstract class VersionedIOReadableWritable implements IOReadableWritable, Versioned {
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeInt(getVersion());
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ int foundVersion = in.readInt();
+ resolveVersionRead(foundVersion);
+ }
+
+ /**
+ * This method is a hook to react on the version tag that we find during read. This can also be used to initialize
+ * further read logic w.r.t. the version at hand.
+ * Default implementation of this method just checks the compatibility of a version number against the own version.
+ *
+ * @param foundVersion the version found from reading the input stream
+ * @throws VersionMismatchException thrown when serialization versions mismatch
+ */
+ protected void resolveVersionRead(int foundVersion) throws VersionMismatchException {
+ if (!isCompatibleVersion(foundVersion)) {
+ long expectedVersion = getVersion();
+ throw new VersionMismatchException(
+ "Incompatible version: found " + foundVersion + ", required " + expectedVersion);
+ }
+ }
+
+ /**
+ * Checks for compatibility between this and the found version. Subclasses can override this methods in case of
+ * intended backwards backwards compatibility.
+ *
+ * @param version version number to compare against.
+ * @return true, iff this is compatible to the passed version.
+ */
+ public boolean isCompatibleVersion(int version) {
+ return getVersion() == version;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
index c25f491..46b82c7 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
@@ -115,4 +115,8 @@ public class ByteArrayInputStreamWithPos extends InputStream {
public int getPosition() {
return position;
}
+
+ public void setPos(int pos) {
+ this.position = pos;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
index df5b34a..ebaf1b9 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
@@ -114,4 +114,8 @@ public class ByteArrayOutputStreamWithPos extends OutputStream {
@Override
public void close() throws IOException {
}
+
+ public byte[] getBuf() {
+ return buffer;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
new file mode 100644
index 0000000..982e7ff
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static org.junit.Assert.fail;
+
+public class TypeSerializerSerializationProxyTest {
+
+ @Test
+ public void testStateSerializerSerializationProxy() throws Exception {
+
+ TypeSerializer<?> serializer = IntSerializer.INSTANCE;
+
+ TypeSerializerSerializationProxy<?> proxy = new TypeSerializerSerializationProxy<>(serializer);
+
+ byte[] serialized;
+ try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+ proxy.write(new DataOutputViewStreamWrapper(out));
+ serialized = out.toByteArray();
+ }
+
+ proxy = new TypeSerializerSerializationProxy<>(Thread.currentThread().getContextClassLoader());
+
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ proxy.read(new DataInputViewStreamWrapper(in));
+ }
+
+ Assert.assertEquals(serializer, proxy.getTypeSerializer());
+ }
+
+ @Test
+ public void testStateSerializerSerializationProxyClassNotFound() throws Exception {
+
+ TypeSerializer<?> serializer = IntSerializer.INSTANCE;
+
+ TypeSerializerSerializationProxy<?> proxy = new TypeSerializerSerializationProxy<>(serializer);
+
+ byte[] serialized;
+ try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+ proxy.write(new DataOutputViewStreamWrapper(out));
+ serialized = out.toByteArray();
+ }
+
+ proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null));
+
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ proxy.read(new DataInputViewStreamWrapper(in));
+ fail("ClassNotFoundException expected, leading to IOException");
+ } catch (IOException expected) {
+
+ }
+
+ proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null), true);
+
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ proxy.read(new DataInputViewStreamWrapper(in));
+ }
+
+ Assert.assertTrue(proxy.getTypeSerializer() instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer);
+
+ Assert.assertArrayEquals(
+ InstantiationUtil.serializeObject(serializer),
+ ((TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer<?>) proxy.getTypeSerializer()).getActualBytes());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java b/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java
new file mode 100644
index 0000000..b7b6d6f
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class VersionedIOWriteableTest {
+
+ @Test
+ public void testReadSameVersion() throws Exception {
+
+ String payload = "test";
+
+ TestWriteable testWriteable = new TestWriteable(1, payload);
+ byte[] serialized;
+ try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+ testWriteable.write(new DataOutputViewStreamWrapper(out));
+ serialized = out.toByteArray();
+ }
+
+ testWriteable = new TestWriteable(1);
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ testWriteable.read(new DataInputViewStreamWrapper(in));
+ }
+
+ Assert.assertEquals(payload, testWriteable.getData());
+ }
+
+ @Test
+ public void testReadCompatibleVersion() throws Exception {
+
+ String payload = "test";
+
+ TestWriteable testWriteable = new TestWriteable(1, payload);
+ byte[] serialized;
+ try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+ testWriteable.write(new DataOutputViewStreamWrapper(out));
+ serialized = out.toByteArray();
+ }
+
+ testWriteable = new TestWriteable(2) {
+ @Override
+ public boolean isCompatibleVersion(int version) {
+ return getVersion() >= version;
+ }
+ };
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ testWriteable.read(new DataInputViewStreamWrapper(in));
+ }
+
+ Assert.assertEquals(payload, testWriteable.getData());
+ }
+
+ @Test
+ public void testReadMismatchVersion() throws Exception {
+
+ String payload = "test";
+
+ TestWriteable testWriteable = new TestWriteable(1, payload);
+ byte[] serialized;
+ try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+ testWriteable.write(new DataOutputViewStreamWrapper(out));
+ serialized = out.toByteArray();
+ }
+
+ testWriteable = new TestWriteable(2);
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ testWriteable.read(new DataInputViewStreamWrapper(in));
+ Assert.fail("Version mismatch expected.");
+ } catch (VersionMismatchException ignored) {
+
+ }
+
+ Assert.assertEquals(null, testWriteable.getData());
+ }
+
+ static class TestWriteable extends VersionedIOReadableWritable {
+
+ private final int version;
+ private String data;
+
+ public TestWriteable(int version) {
+ this(version, null);
+ }
+
+ public TestWriteable(int version, String data) {
+ this.version = version;
+ this.data = data;
+ }
+
+ @Override
+ public int getVersion() {
+ return version;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ super.write(out);
+ out.writeUTF(data);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ super.read(in);
+ this.data = in.readUTF();
+ }
+
+ @Override
+ protected void resolveVersionRead(int foundVersion) throws VersionMismatchException {
+ super.resolveVersionRead(foundVersion);
+ }
+
+ @Override
+ public boolean isCompatibleVersion(int version) {
+ return super.isCompatibleVersion(version);
+ }
+
+ public String getData() {
+ return data;
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 5b47362..d7a10d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state;
+import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -52,6 +53,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
private final Collection<OperatorStateHandle> restoreSnapshots;
private final CloseableRegistry closeStreamOnCancelRegistry;
private final JavaSerializer<Serializable> javaSerializer;
+ private final ClassLoader userClassloader;
/**
* Restores a OperatorStateStore (lazily) using the provided snapshots.
@@ -60,21 +62,23 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
*/
public DefaultOperatorStateBackend(
ClassLoader userClassLoader,
- Collection<OperatorStateHandle> restoreSnapshots) {
+ Collection<OperatorStateHandle> restoreSnapshots) throws IOException {
- Preconditions.checkNotNull(userClassLoader);
- this.javaSerializer = new JavaSerializer<>(userClassLoader);
- this.restoreSnapshots = restoreSnapshots;
+ this.userClassloader = Preconditions.checkNotNull(userClassLoader);
+ this.javaSerializer = new JavaSerializer<>();
this.registeredStates = new HashMap<>();
this.closeStreamOnCancelRegistry = new CloseableRegistry();
+ this.restoreSnapshots = restoreSnapshots;
+ restoreState();
}
/**
* Creates an empty OperatorStateStore.
*/
- public DefaultOperatorStateBackend(ClassLoader userClassLoader) {
+ public DefaultOperatorStateBackend(ClassLoader userClassLoader) throws IOException {
this(userClassLoader, null);
}
+
@SuppressWarnings("unchecked")
@Override
public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception {
@@ -82,8 +86,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
}
@Override
- public <S> ListState<S> getOperatorState(
- ListStateDescriptor<S> stateDescriptor) throws IOException {
+ public <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws IOException {
Preconditions.checkNotNull(stateDescriptor);
@@ -95,41 +98,93 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
if (null == partitionableListState) {
- partitionableListState = new PartitionableListState<>(partitionStateSerializer);
-
+ partitionableListState = new PartitionableListState<>(name, partitionStateSerializer);
registeredStates.put(name, partitionableListState);
+ } else {
+ Preconditions.checkState(
+ partitionableListState.getPartitionStateSerializer().
+ isCompatibleWith(stateDescriptor.getSerializer()),
+ "Incompatible type serializers. Provided: " + stateDescriptor.getSerializer() +
+ ", found: " + partitionableListState.getPartitionStateSerializer());
+ }
+
+ return partitionableListState;
+ }
+
+ private void restoreState() throws IOException {
+
+ if (null == restoreSnapshots) {
+ return;
+ }
- // Try to restore previous state if state handles to snapshots are provided
- if (restoreSnapshots != null) {
- for (OperatorStateHandle stateHandle : restoreSnapshots) {
- //TODO we coud be even more gc friendly be removing handles from the collections one the map is empty
- // search and remove to be gc friendly
- long[] offsets = stateHandle.getStateNameToPartitionOffsets().remove(name);
+ for (OperatorStateHandle stateHandle : restoreSnapshots) {
- if (offsets != null) {
+ if (stateHandle == null) {
+ continue;
+ }
+
+ FSDataInputStream in = stateHandle.openInputStream();
+ closeStreamOnCancelRegistry.registerClosable(in);
+
+ ClassLoader restoreClassLoader = Thread.currentThread().getContextClassLoader();
- FSDataInputStream in = stateHandle.openInputStream();
- try {
- closeStreamOnCancelRegistry.registerClosable(in);
+ try {
+ Thread.currentThread().setContextClassLoader(userClassloader);
+ OperatorBackendSerializationProxy backendSerializationProxy =
+ new OperatorBackendSerializationProxy(userClassloader);
- DataInputView div = new DataInputViewStreamWrapper(in);
+ backendSerializationProxy.read(new DataInputViewStreamWrapper(in));
- for (int i = 0; i < offsets.length; ++i) {
+ List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
+ backendSerializationProxy.getNamedStateSerializationProxies();
- in.seek(offsets[i]);
- S partitionState = partitionStateSerializer.deserialize(div);
- partitionableListState.add(partitionState);
- }
- } finally {
- closeStreamOnCancelRegistry.unregisterClosable(in);
- in.close();
- }
+ // Recreate all PartitionableListStates from the meta info
+ for (OperatorBackendSerializationProxy.StateMetaInfo<?> stateMetaInfo : metaInfoList) {
+ PartitionableListState<?> listState = registeredStates.get(stateMetaInfo.getName());
+
+ if (null == listState) {
+ listState = new PartitionableListState<>(
+ stateMetaInfo.getName(),
+ stateMetaInfo.getStateSerializer());
+
+ registeredStates.put(listState.getName(), listState);
+ } else {
+ Preconditions.checkState(listState.getPartitionStateSerializer().isCompatibleWith(
+ stateMetaInfo.getStateSerializer()), "Incompatible state serializers found: " +
+ listState.getPartitionStateSerializer() + " is not compatible with " +
+ stateMetaInfo.getStateSerializer());
}
}
+
+ // Restore all the state in PartitionableListStates
+ for (Map.Entry<String, long[]> nameToOffsets : stateHandle.getStateNameToPartitionOffsets().entrySet()) {
+ PartitionableListState<?> stateListForName = registeredStates.get(nameToOffsets.getKey());
+
+ Preconditions.checkState(null != stateListForName, "Found state without " +
+ "corresponding meta info: " + nameToOffsets.getKey());
+
+ deserializeStateValues(stateListForName, in, nameToOffsets.getValue());
+ }
+
+ } finally {
+ Thread.currentThread().setContextClassLoader(restoreClassLoader);
+ closeStreamOnCancelRegistry.unregisterClosable(in);
+ IOUtils.closeQuietly(in);
}
}
+ }
- return partitionableListState;
+ private static <S> void deserializeStateValues(
+ PartitionableListState<S> stateListForName,
+ FSDataInputStream in,
+ long[] offsets) throws IOException {
+
+ DataInputView div = new DataInputViewStreamWrapper(in);
+ TypeSerializer<S> serializer = stateListForName.getPartitionStateSerializer();
+ for (long offset : offsets) {
+ in.seek(offset);
+ stateListForName.add(serializer.deserialize(div));
+ }
}
@Override
@@ -140,6 +195,18 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
return new DoneFuture<>(null);
}
+ List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
+ new ArrayList<>(registeredStates.size());
+
+ for (Map.Entry<String, PartitionableListState<?>> entry : registeredStates.entrySet()) {
+ PartitionableListState<?> state = entry.getValue();
+ OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo =
+ new OperatorBackendSerializationProxy.StateMetaInfo<>(
+ state.getName(),
+ state.getPartitionStateSerializer());
+ metaInfoList.add(metaInfo);
+ }
+
Map<String, long[]> writtenStatesMetaData = new HashMap<>(registeredStates.size());
CheckpointStreamFactory.CheckpointStateOutputStream out = streamFactory.
@@ -150,6 +217,11 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
DataOutputView dov = new DataOutputViewStreamWrapper(out);
+ OperatorBackendSerializationProxy backendSerializationProxy =
+ new OperatorBackendSerializationProxy(metaInfoList);
+
+ backendSerializationProxy.write(dov);
+
dov.writeInt(registeredStates.size());
for (Map.Entry<String, PartitionableListState<?>> entry : registeredStates.entrySet()) {
@@ -171,29 +243,26 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
registeredStates.clear();
}
+ @Override
+ public Set<String> getRegisteredStateNames() {
+ return registeredStates.keySet();
+ }
+
+ @Override
+ public void close() throws IOException {
+ closeStreamOnCancelRegistry.close();
+ }
+
static final class PartitionableListState<S> implements ListState<S> {
private final List<S> internalList;
+ private final String name;
private final TypeSerializer<S> partitionStateSerializer;
- public PartitionableListState(TypeSerializer<S> partitionStateSerializer) {
+ public PartitionableListState(String name, TypeSerializer<S> partitionStateSerializer) {
this.internalList = new ArrayList<>();
this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer);
- }
-
- @Override
- public void clear() {
- internalList.clear();
- }
-
- @Override
- public Iterable<S> get() {
- return internalList;
- }
-
- @Override
- public void add(S value) {
- internalList.add(value);
+ this.name = Preconditions.checkNotNull(name);
}
public long[] write(FSDataOutputStream out) throws IOException {
@@ -215,6 +284,29 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
return internalList;
}
+ public String getName() {
+ return name;
+ }
+
+ public TypeSerializer<S> getPartitionStateSerializer() {
+ return partitionStateSerializer;
+ }
+
+ @Override
+ public void clear() {
+ internalList.clear();
+ }
+
+ @Override
+ public Iterable<S> get() {
+ return internalList;
+ }
+
+ @Override
+ public void add(S value) {
+ internalList.add(value);
+ }
+
@Override
public String toString() {
return "PartitionableListState{" +
@@ -222,16 +314,5 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
'}';
}
}
-
- @Override
- public Set<String> getRegisteredStateNames() {
- return registeredStates.keySet();
- }
-
- @Override
- public void close() throws IOException {
- closeStreamOnCancelRegistry.close();
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
index 2eb9595..512baf6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.Serializable;
@@ -34,15 +33,7 @@ import java.io.Serializable;
@Internal
final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
- private final ClassLoader userClassLoader;
-
- public JavaSerializer() {
- this(Thread.currentThread().getContextClassLoader());
- }
-
- public JavaSerializer(ClassLoader userClassLoader) {
- this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
- }
+ private static final long serialVersionUID = 5067491650263321234L;
@Override
public boolean isImmutableType() {
@@ -87,7 +78,8 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
@Override
public T deserialize(DataInputView source) throws IOException {
try {
- return InstantiationUtil.deserializeObject(new DataInputViewStream(source), userClassLoader);
+ return InstantiationUtil.deserializeObject(
+ new DataInputViewStream(source), Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IOException("Could not deserialize object.", e);
}
@@ -107,7 +99,7 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
@Override
public boolean equals(Object obj) {
- return obj instanceof JavaSerializer && userClassLoader.equals(((JavaSerializer<T>) obj).userClassLoader);
+ return obj instanceof JavaSerializer;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
new file mode 100644
index 0000000..dbee6cb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Serialization proxy for all meta data in keyed state backends. In the future we might also migrate the actual state
+ * serialization logic here.
+ */
+public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable {
+
+ private static final int VERSION = 1;
+
+ private TypeSerializerSerializationProxy<?> keySerializerProxy;
+ private List<StateMetaInfo<?, ?>> namedStateSerializationProxies;
+
+ private ClassLoader userCodeClassLoader;
+
+ public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) {
+ this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
+ }
+
+ public KeyedBackendSerializationProxy(TypeSerializer<?> keySerializer, List<StateMetaInfo<?, ?>> namedStateSerializationProxies) {
+ this.keySerializerProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(keySerializer));
+ this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies);
+ Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE);
+ }
+
+ public List<StateMetaInfo<?, ?>> getNamedStateSerializationProxies() {
+ return namedStateSerializationProxies;
+ }
+
+ public TypeSerializerSerializationProxy<?> getKeySerializerProxy() {
+ return keySerializerProxy;
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ super.write(out);
+
+ keySerializerProxy.write(out);
+
+ out.writeShort(namedStateSerializationProxies.size());
+
+ for (StateMetaInfo<?, ?> kvState : namedStateSerializationProxies) {
+ kvState.write(out);
+ }
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ super.read(in);
+
+ keySerializerProxy = new TypeSerializerSerializationProxy<>(userCodeClassLoader);
+ keySerializerProxy.read(in);
+
+ int numKvStates = in.readShort();
+ namedStateSerializationProxies = new ArrayList<>(numKvStates);
+ for (int i = 0; i < numKvStates; ++i) {
+ StateMetaInfo<?, ?> stateSerializationProxy = new StateMetaInfo<>(userCodeClassLoader);
+ stateSerializationProxy.read(in);
+ namedStateSerializationProxies.add(stateSerializationProxy);
+ }
+ }
+
+//----------------------------------------------------------------------------------------------------------------------
+
+ /**
+ * This is the serialization proxy for {@link RegisteredBackendStateMetaInfo} for a single registered state in a
+ * keyed backend.
+ */
+ public static class StateMetaInfo<N, S> implements IOReadableWritable {
+
+ private StateDescriptor.Type stateType;
+ private String stateName;
+ private TypeSerializerSerializationProxy<N> namespaceSerializerSerializationProxy;
+ private TypeSerializerSerializationProxy<S> stateSerializerSerializationProxy;
+
+ private ClassLoader userClassLoader;
+
+ StateMetaInfo(ClassLoader userClassLoader) {
+ this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
+ }
+
+ public StateMetaInfo(
+ StateDescriptor.Type stateType,
+ String name,
+ TypeSerializer<N> namespaceSerializer,
+ TypeSerializer<S> stateSerializer) {
+
+ this.stateType = Preconditions.checkNotNull(stateType);
+ this.stateName = Preconditions.checkNotNull(name);
+ this.namespaceSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(namespaceSerializer));
+ this.stateSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(stateSerializer));
+ }
+
+ public StateDescriptor.Type getStateType() {
+ return stateType;
+ }
+
+ public void setStateType(StateDescriptor.Type stateType) {
+ this.stateType = stateType;
+ }
+
+ public String getStateName() {
+ return stateName;
+ }
+
+ public void setStateName(String stateName) {
+ this.stateName = stateName;
+ }
+
+ public TypeSerializerSerializationProxy<N> getNamespaceSerializerSerializationProxy() {
+ return namespaceSerializerSerializationProxy;
+ }
+
+ public void setNamespaceSerializerSerializationProxy(TypeSerializerSerializationProxy<N> namespaceSerializerSerializationProxy) {
+ this.namespaceSerializerSerializationProxy = namespaceSerializerSerializationProxy;
+ }
+
+ public TypeSerializerSerializationProxy<S> getStateSerializerSerializationProxy() {
+ return stateSerializerSerializationProxy;
+ }
+
+ public void setStateSerializerSerializationProxy(TypeSerializerSerializationProxy<S> stateSerializerSerializationProxy) {
+ this.stateSerializerSerializationProxy = stateSerializerSerializationProxy;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeInt(getStateType().ordinal());
+ out.writeUTF(getStateName());
+
+ getNamespaceSerializerSerializationProxy().write(out);
+ getStateSerializerSerializationProxy().write(out);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ int enumOrdinal = in.readInt();
+ setStateType(StateDescriptor.Type.values()[enumOrdinal]);
+ setStateName(in.readUTF());
+
+ namespaceSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(userClassLoader);
+ namespaceSerializerSerializationProxy.read(in);
+
+ stateSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(userClassLoader);
+ stateSerializerSerializationProxy.read(in);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ StateMetaInfo<?, ?> that = (StateMetaInfo<?, ?>) o;
+
+ if (!getStateName().equals(that.getStateName())) {
+ return false;
+ }
+
+ if (!getNamespaceSerializerSerializationProxy().equals(that.getNamespaceSerializerSerializationProxy())) {
+ return false;
+ }
+
+ return getStateSerializerSerializationProxy().equals(that.getStateSerializerSerializationProxy());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getStateName().hashCode();
+ result = 31 * result + getNamespaceSerializerSerializationProxy().hashCode();
+ result = 31 * result + getStateSerializerSerializationProxy().hashCode();
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
new file mode 100644
index 0000000..61df979
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Serialization proxy for all meta data in operator state backends. In the future we might also migrate the actual state
+ * serialization logic here.
+ */
+public class OperatorBackendSerializationProxy extends VersionedIOReadableWritable {
+
+ private static final int VERSION = 1;
+
+ private List<StateMetaInfo<?>> namedStateSerializationProxies;
+ private ClassLoader userCodeClassLoader;
+
+ public OperatorBackendSerializationProxy(ClassLoader userCodeClassLoader) {
+ this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
+ }
+
+ public OperatorBackendSerializationProxy(List<StateMetaInfo<?>> namedStateSerializationProxies) {
+ this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies);
+ Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE);
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ super.write(out);
+
+ out.writeShort(namedStateSerializationProxies.size());
+
+ for (StateMetaInfo<?> kvState : namedStateSerializationProxies) {
+ kvState.write(out);
+ }
+ }
+
+ @Override
+ public void read(DataInputView out) throws IOException {
+ super.read(out);
+
+ int numKvStates = out.readShort();
+ namedStateSerializationProxies = new ArrayList<>(numKvStates);
+ for (int i = 0; i < numKvStates; ++i) {
+ StateMetaInfo<?> stateSerializationProxy = new StateMetaInfo<>(userCodeClassLoader);
+ stateSerializationProxy.read(out);
+ namedStateSerializationProxies.add(stateSerializationProxy);
+ }
+ }
+
+ public List<StateMetaInfo<?>> getNamedStateSerializationProxies() {
+ return namedStateSerializationProxies;
+ }
+
+ //----------------------------------------------------------------------------------------------------------------------
+
+ public static class StateMetaInfo<S> implements IOReadableWritable {
+
+ private String name;
+ private TypeSerializer<S> stateSerializer;
+ private ClassLoader userClassLoader;
+
+ private StateMetaInfo(ClassLoader userClassLoader) {
+ this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
+ }
+
+ public StateMetaInfo(String name, TypeSerializer<S> stateSerializer) {
+ this.name = Preconditions.checkNotNull(name);
+ this.stateSerializer = Preconditions.checkNotNull(stateSerializer);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public TypeSerializer<S> getStateSerializer() {
+ return stateSerializer;
+ }
+
+ public void setStateSerializer(TypeSerializer<S> stateSerializer) {
+ this.stateSerializer = stateSerializer;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeUTF(getName());
+ DataOutputViewStream dos = new DataOutputViewStream(out);
+ InstantiationUtil.serializeObject(dos, getStateSerializer());
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ setName(in.readUTF());
+ DataInputViewStream dis = new DataInputViewStream(in);
+ try {
+ TypeSerializer<S> stateSerializer = InstantiationUtil.deserializeObject(dis, userClassLoader);
+ setStateSerializer(stateSerializer);
+ } catch (ClassNotFoundException exception) {
+ throw new IOException(exception);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
new file mode 100644
index 0000000..62418c3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Compound meta information for a registered state in a keyed state backend. This combines all serializers and the
+ * state name.
+ *
+ * @param <N> Type of namespace
+ * @param <S> Type of state value
+ */
+public class RegisteredBackendStateMetaInfo<N, S> {
+
+ private final StateDescriptor.Type stateType;
+ private final String name;
+ private final TypeSerializer<N> namespaceSerializer;
+ private final TypeSerializer<S> stateSerializer;
+
+ public RegisteredBackendStateMetaInfo(KeyedBackendSerializationProxy.StateMetaInfo<N, S> metaInfoProxy) {
+ this.stateType = metaInfoProxy.getStateType();
+ this.name = metaInfoProxy.getStateName();
+ this.namespaceSerializer = metaInfoProxy.getNamespaceSerializerSerializationProxy().getTypeSerializer();
+ this.stateSerializer = metaInfoProxy.getStateSerializerSerializationProxy().getTypeSerializer();
+ }
+
+ public RegisteredBackendStateMetaInfo(
+ StateDescriptor.Type stateType,
+ String name,
+ TypeSerializer<N> namespaceSerializer,
+ TypeSerializer<S> stateSerializer) {
+
+ this.stateType = Preconditions.checkNotNull(stateType);
+ this.name = Preconditions.checkNotNull(name);
+ this.namespaceSerializer = namespaceSerializer;
+ this.stateSerializer = stateSerializer;
+ }
+
+ public StateDescriptor.Type getStateType() {
+ return stateType;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ public TypeSerializer<S> getStateSerializer() {
+ return stateSerializer;
+ }
+
+ public boolean isCompatibleWith(RegisteredBackendStateMetaInfo<?, ?> other) {
+
+ if (this == other) {
+ return true;
+ }
+
+ if (null == other) {
+ return false;
+ }
+
+ if (!stateType.equals(StateDescriptor.Type.UNKNOWN)
+ && !other.stateType.equals(StateDescriptor.Type.UNKNOWN)
+ && !stateType.equals(other.stateType)) {
+ return false;
+ }
+
+ if (!name.equals(other.getName())) {
+ return false;
+ }
+
+ return namespaceSerializer.isCompatibleWith(other.namespaceSerializer)
+ && stateSerializer.isCompatibleWith(other.stateSerializer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ RegisteredBackendStateMetaInfo<?, ?> that = (RegisteredBackendStateMetaInfo<?, ?>) o;
+
+ if (!stateType.equals(that.stateType)) {
+ return false;
+ }
+
+ if (!getName().equals(that.getName())) {
+ return false;
+ }
+
+ if (getNamespaceSerializer() != null ? !getNamespaceSerializer().equals(that.getNamespaceSerializer()) : that.getNamespaceSerializer() != null) {
+ return false;
+ }
+ return getStateSerializer() != null ? getStateSerializer().equals(that.getStateSerializer()) : that.getStateSerializer() == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getName().hashCode();
+ result = 31 * result + getStateType().hashCode();
+ result = 31 * result + (getNamespaceSerializer() != null ? getNamespaceSerializer().hashCode() : 0);
+ result = 31 * result + (getStateSerializer() != null ? getStateSerializer().hashCode() : 0);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 6e85b72..d07901b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -49,6 +50,8 @@ import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
@@ -61,6 +64,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.RunnableFuture;
@@ -122,53 +126,66 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// ------------------------------------------------------------------------
// state backend operations
// ------------------------------------------------------------------------
+
@SuppressWarnings("unchecked")
- @Override
- public <N, V> ValueState<V> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception {
- StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateDesc.getName());
+ private <N, V> StateTable<K, N, V> tryRegisterStateTable(
+ TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) {
+
+ String name = stateDesc.getName();
+ StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(name);
+
+ RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
+ new RegisteredBackendStateMetaInfo<>(stateDesc.getType(), name, namespaceSerializer, stateDesc.getSerializer());
+
+ return tryRegisterStateTable(stateTable, newMetaInfo);
+ }
+
+ private <N, V> StateTable<K, N, V> tryRegisterStateTable(
+ StateTable<K, N, V> stateTable, RegisteredBackendStateMetaInfo<N, V> newMetaInfo) {
if (stateTable == null) {
- stateTable = new StateTable<>(stateDesc.getSerializer(), namespaceSerializer, keyGroupRange);
- stateTables.put(stateDesc.getName(), stateTable);
+ stateTable = new StateTable<>(newMetaInfo, keyGroupRange);
+ stateTables.put(newMetaInfo.getName(), stateTable);
+ } else {
+ if (!newMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) {
+ throw new RuntimeException("Trying to access state using incompatible meta info, was " +
+ stateTable.getMetaInfo() + " trying access with " + newMetaInfo);
+ }
+ stateTable.setMetaInfo(newMetaInfo);
}
+ return stateTable;
+ }
+ @SuppressWarnings("unchecked")
+ @Override
+ public <N, V> ValueState<V> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception {
+ StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
return new HeapValueState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@SuppressWarnings("unchecked")
@Override
public <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
- StateTable<K, N, ArrayList<T>> stateTable = (StateTable<K, N, ArrayList<T>>) stateTables.get(stateDesc.getName());
+ String name = stateDesc.getName();
+ StateTable<K, N, ArrayList<T>> stateTable = (StateTable<K, N, ArrayList<T>>) stateTables.get(name);
- if (stateTable == null) {
- stateTable = new StateTable<>(new ArrayListSerializer<>(stateDesc.getSerializer()), namespaceSerializer, keyGroupRange);
- stateTables.put(stateDesc.getName(), stateTable);
- }
+ RegisteredBackendStateMetaInfo<N, ArrayList<T>> newMetaInfo =
+ new RegisteredBackendStateMetaInfo<>(stateDesc.getType(), name, namespaceSerializer, new ArrayListSerializer<>(stateDesc.getSerializer()));
+ stateTable = tryRegisterStateTable(stateTable, newMetaInfo);
return new HeapListState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
}
+
@SuppressWarnings("unchecked")
@Override
public <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
- StateTable<K, N, T> stateTable = (StateTable<K, N, T>) stateTables.get(stateDesc.getName());
-
- if (stateTable == null) {
- stateTable = new StateTable<>(stateDesc.getSerializer(), namespaceSerializer, keyGroupRange);
- stateTables.put(stateDesc.getName(), stateTable);
- }
-
+ StateTable<K, N, T> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
return new HeapReducingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@SuppressWarnings("unchecked")
@Override
protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
- StateTable<K, N, ACC> stateTable = (StateTable<K, N, ACC>) stateTables.get(stateDesc.getName());
-
- if (stateTable == null) {
- stateTable = new StateTable<>(stateDesc.getSerializer(), namespaceSerializer, keyGroupRange);
- stateTables.put(stateDesc.getName(), stateTable);
- }
-
+ StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
return new HeapFoldingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@@ -192,23 +209,28 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
"Too many KV-States: " + stateTables.size() +
". Currently at most " + Short.MAX_VALUE + " states are supported");
- outView.writeShort(stateTables.size());
+ List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoProxyList = new ArrayList<>(stateTables.size());
Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size());
for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
- outView.writeUTF(kvState.getKey());
-
- TypeSerializer<?> namespaceSerializer = kvState.getValue().getNamespaceSerializer();
- TypeSerializer<?> stateSerializer = kvState.getValue().getStateSerializer();
-
- InstantiationUtil.serializeObject(stream, namespaceSerializer);
- InstantiationUtil.serializeObject(stream, stateSerializer);
+ RegisteredBackendStateMetaInfo<?, ?> metaInfo = kvState.getValue().getMetaInfo();
+ KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(
+ metaInfo.getStateType(),
+ metaInfo.getName(),
+ metaInfo.getNamespaceSerializer(),
+ metaInfo.getStateSerializer());
+ metaInfoProxyList.add(metaInfoProxy);
kVStateToId.put(kvState.getKey(), kVStateToId.size());
}
+ KeyedBackendSerializationProxy serializationProxy =
+ new KeyedBackendSerializationProxy(keySerializer, metaInfoProxyList);
+
+ serializationProxy.write(outView);
+
int offsetCounter = 0;
long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()];
@@ -278,23 +300,27 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
try {
DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);
- int numKvStates = inView.readShort();
+ KeyedBackendSerializationProxy serializationProxy =
+ new KeyedBackendSerializationProxy(userCodeClassLoader);
- for (int i = 0; i < numKvStates; ++i) {
- String stateName = inView.readUTF();
+ serializationProxy.read(inView);
- TypeSerializer<?> namespaceSerializer =
- InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader);
- TypeSerializer<?> stateSerializer =
- InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader);
+ List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoList =
+ serializationProxy.getNamedStateSerializationProxies();
- StateTable<K, ?, ?> stateTable = stateTables.get(stateName);
+ for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoSerializationProxy : metaInfoList) {
+
+ StateTable<K, ?, ?> stateTable = stateTables.get(metaInfoSerializationProxy.getStateName());
//important: only create a new table we did not already create it previously
if (null == stateTable) {
- stateTable = new StateTable<>(stateSerializer, namespaceSerializer, keyGroupRange);
- stateTables.put(stateName, stateTable);
- kvStatesById.put(numRegisteredKvStates, stateName);
+
+ RegisteredBackendStateMetaInfo<?, ?> registeredBackendStateMetaInfo =
+ new RegisteredBackendStateMetaInfo<>(metaInfoSerializationProxy);
+
+ stateTable = new StateTable<>(registeredBackendStateMetaInfo, keyGroupRange);
+ stateTables.put(metaInfoSerializationProxy.getStateName(), stateTable);
+ kvStatesById.put(numRegisteredKvStates, metaInfoSerializationProxy.getStateName());
++numRegisteredKvStates;
}
}
@@ -307,7 +333,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
int writtenKeyGroupIndex = inView.readInt();
assert writtenKeyGroupIndex == keyGroupIndex;
- for (int i = 0; i < numKvStates; i++) {
+ for (int i = 0; i < metaInfoList.size(); i++) {
int kvStateId = inView.readShort();
byte isPresent = inView.readByte();
@@ -419,11 +445,18 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
rawResultMap.put(VoidNamespace.INSTANCE, nullNameSpaceFix);
}
- StateTable<K, ?, ?> stateTable = new StateTable<>(stateSerializer, namespaceSerializer, keyGroupRange);
+ RegisteredBackendStateMetaInfo<?, ?> registeredBackendStateMetaInfo =
+ new RegisteredBackendStateMetaInfo<>(
+ StateDescriptor.Type.UNKNOWN,
+ nameToState.getKey(),
+ namespaceSerializer,
+ stateSerializer);
+
+ StateTable<K, ?, ?> stateTable = new StateTable<>(registeredBackendStateMetaInfo, keyGroupRange);
stateTable.getState().set(0, rawResultMap);
// add named state to the backend
- getStateTables().put(nameToState.getKey(), stateTable);
+ getStateTables().put(registeredBackendStateMetaInfo.getName(), stateTable);
}
}