You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/12/14 16:58:01 UTC
[2/5] flink git commit: [FLINK-5051] Add Serde Proxies for
Serializers and State Backend Data
[FLINK-5051] Add Serde Proxies for Serializers and State Backend Data
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21d1d8b4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21d1d8b4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21d1d8b4
Branch: refs/heads/master
Commit: 21d1d8b49337e734ce3defb5f1b9344f748cb49e
Parents: e95fe56
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Nov 24 17:29:24 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Dec 14 17:50:51 2016 +0100
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 108 +++++----
.../common/state/FoldingStateDescriptor.java | 5 +
.../api/common/state/ListStateDescriptor.java | 5 +
.../common/state/ReducingStateDescriptor.java | 5 +
.../flink/api/common/state/StateDescriptor.java | 7 +
.../api/common/state/ValueStateDescriptor.java | 5 +
.../api/common/typeutils/TypeSerializer.java | 10 +-
.../TypeSerializerSerializationProxy.java | 231 +++++++++++++++++++
.../flink/core/io/VersionMismatchException.java | 44 ++++
.../core/io/VersionedIOReadableWritable.java | 70 ++++++
.../memory/ByteArrayInputStreamWithPos.java | 4 +
.../memory/ByteArrayOutputStreamWithPos.java | 4 +
.../TypeSerializerSerializationProxyTest.java | 94 ++++++++
.../flink/core/io/VersionedIOWriteableTest.java | 148 ++++++++++++
.../state/DefaultOperatorStateBackend.java | 195 +++++++++++-----
.../flink/runtime/state/JavaSerializer.java | 16 +-
.../state/KeyedBackendSerializationProxy.java | 215 +++++++++++++++++
.../OperatorBackendSerializationProxy.java | 140 +++++++++++
.../state/RegisteredBackendStateMetaInfo.java | 132 +++++++++++
.../state/heap/HeapKeyedStateBackend.java | 125 ++++++----
.../flink/runtime/state/heap/StateTable.java | 26 ++-
.../runtime/query/QueryableStateClientTest.java | 12 +-
.../runtime/state/OperatorStateBackendTest.java | 3 +-
.../runtime/state/SerializationProxiesTest.java | 99 ++++++++
.../runtime/state/StateBackendTestBase.java | 6 +-
25 files changed, 1534 insertions(+), 175 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 5fef5e5..1c0a4b7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -50,6 +50,8 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.util.InstantiationUtil;
@@ -123,7 +125,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* Information about the k/v states as we create them. This is used to retrieve the
* column family that is used for a state and also for sanity checks when restoring.
*/
- private Map<String, Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>> kvStateInformation;
+ private Map<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> kvStateInformation;
/** Number of bytes required to prefix the key groups. */
private final int keyGroupPrefixBytes;
@@ -237,7 +239,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// and access it in a synchronized block that locks on #dbDisposeLock.
if (db != null) {
- for (Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> column : kvStateInformation.values()) {
+ for (Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> column :
+ kvStateInformation.values()) {
+
column.f0.close();
}
@@ -492,18 +496,24 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
private void writeKVStateMetaData() throws IOException, InterruptedException {
- //write number of k/v states
- outputView.writeInt(stateBackend.kvStateInformation.size());
+
+ List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoList =
+ new ArrayList<>(stateBackend.kvStateInformation.size());
int kvStateId = 0;
- //iterate all column families, where each column family holds one k/v state, to write the metadata
- for (Map.Entry<String, Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>> column : stateBackend.kvStateInformation.entrySet()) {
+ for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>> column :
+ stateBackend.kvStateInformation.entrySet()) {
+
+ RegisteredBackendStateMetaInfo<?, ?> metaInfo = column.getValue().f1;
- //be cooperative and check for interruption from time to time in the hot loop
- checkInterrupted();
+ KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy =
+ new KeyedBackendSerializationProxy.StateMetaInfo<>(
+ metaInfo.getStateType(),
+ metaInfo.getName(),
+ metaInfo.getNamespaceSerializer(),
+ metaInfo.getStateSerializer());
- //write StateDescriptor for this k/v state
- InstantiationUtil.serializeObject(outStream, column.getValue().f1);
+ metaInfoList.add(metaInfoProxy);
//retrieve iterator for this k/v states
readOptions = new ReadOptions();
@@ -512,6 +522,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
kvStateIterators.add(new Tuple2<>(iterator, kvStateId));
++kvStateId;
}
+
+ KeyedBackendSerializationProxy serializationProxy =
+ new KeyedBackendSerializationProxy(stateBackend.getKeySerializer(), metaInfoList);
+
+ serializationProxy.write(outputView);
}
private void writeKVStateData() throws IOException, InterruptedException {
@@ -691,30 +706,35 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* @throws RocksDBException
*/
private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException {
- //read number of k/v states
- int numColumns = currentStateHandleInView.readInt();
- //those two lists are aligned and should later have the same size!
- currentStateHandleKVStateColumnFamilies = new ArrayList<>(numColumns);
+ KeyedBackendSerializationProxy serializationProxy =
+ new KeyedBackendSerializationProxy(rocksDBKeyedStateBackend.userCodeClassLoader);
- //restore the empty columns for the k/v states through the metadata
- for (int i = 0; i < numColumns; i++) {
+ serializationProxy.read(currentStateHandleInView);
- StateDescriptor<?, ?> stateDescriptor = InstantiationUtil.deserializeObject(
- currentStateHandleInStream,
- rocksDBKeyedStateBackend.userCodeClassLoader);
+ List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoProxyList =
+ serializationProxy.getNamedStateSerializationProxies();
- Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> columnFamily = rocksDBKeyedStateBackend.
- kvStateInformation.get(stateDescriptor.getName());
+ currentStateHandleKVStateColumnFamilies = new ArrayList<>(metaInfoProxyList.size());
+
+ for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy : metaInfoProxyList) {
+ Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> columnFamily =
+ rocksDBKeyedStateBackend.kvStateInformation.get(metaInfoProxy.getStateName());
if (null == columnFamily) {
ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
- stateDescriptor.getName().getBytes(), rocksDBKeyedStateBackend.columnOptions);
+ metaInfoProxy.getStateName().getBytes(), rocksDBKeyedStateBackend.columnOptions);
+
+ RegisteredBackendStateMetaInfo<?, ?> stateMetaInfo =
+ new RegisteredBackendStateMetaInfo<>(metaInfoProxy);
- columnFamily = new Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>(
- rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor), stateDescriptor);
+ columnFamily = new Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>(
+ rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor),
+ stateMetaInfo);
- rocksDBKeyedStateBackend.kvStateInformation.put(stateDescriptor.getName(), columnFamily);
+ rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), columnFamily);
+ } else {
+ //TODO we could check here for incompatible serializer versions between previous tasks
}
currentStateHandleKVStateColumnFamilies.add(columnFamily.f0);
@@ -776,15 +796,24 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* <p>This also checks whether the {@link StateDescriptor} for a state matches the one
* that we checkpointed, i.e. is already in the map of column families.
*/
- protected ColumnFamilyHandle getColumnFamily(StateDescriptor<?, ?> descriptor) {
+ @SuppressWarnings("rawtypes, unchecked")
+ protected <N, S> ColumnFamilyHandle getColumnFamily(StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) {
+
+ Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> stateInfo =
+ kvStateInformation.get(descriptor.getName());
- Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> stateInfo = kvStateInformation.get(descriptor.getName());
+ RegisteredBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredBackendStateMetaInfo<>(
+ descriptor.getType(),
+ descriptor.getName(),
+ namespaceSerializer,
+ descriptor.getSerializer());
if (stateInfo != null) {
- if (!stateInfo.f1.equals(descriptor)) {
- throw new RuntimeException("Trying to access state using wrong StateDescriptor, was " + stateInfo.f1 +
- " trying access with " + descriptor);
+ if (!newMetaInfo.isCompatibleWith(stateInfo.f1)) {
+ throw new RuntimeException("Trying to access state using wrong meta info, was " + stateInfo.f1 +
+ " trying access with " + newMetaInfo);
}
+ stateInfo.f1 = newMetaInfo;
return stateInfo.f0;
}
@@ -793,9 +822,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
try {
ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor);
- Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>> tuple =
- new Tuple2<ColumnFamilyHandle, StateDescriptor<?, ?>>(columnFamily, descriptor);
- kvStateInformation.put(descriptor.getName(), tuple);
+ Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<N, S>> tuple =
+ new Tuple2<>(columnFamily, newMetaInfo);
+ Map rawAccess = kvStateInformation;
+ rawAccess.put(descriptor.getName(), tuple);
return columnFamily;
} catch (RocksDBException e) {
throw new RuntimeException("Error creating ColumnFamilyHandle.", e);
@@ -806,7 +836,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
protected <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<T> stateDesc) throws Exception {
- ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
return new RocksDBValueState<>(columnFamily, namespaceSerializer, stateDesc, this);
}
@@ -815,7 +845,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
protected <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<T> stateDesc) throws Exception {
- ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
return new RocksDBListState<>(columnFamily, namespaceSerializer, stateDesc, this);
}
@@ -824,7 +854,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
protected <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<T> stateDesc) throws Exception {
- ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
return new RocksDBReducingState<>(columnFamily, namespaceSerializer, stateDesc, this);
}
@@ -833,7 +863,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer,
FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
- ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc);
+ ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this);
}
@@ -1116,7 +1146,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
columnFamilyMapping.put(mappingByte, stateDescriptor);
// this will fill in the k/v state information
- getColumnFamily(stateDescriptor);
+ getColumnFamily(stateDescriptor, null);
}
// try and read until EOF
@@ -1124,7 +1154,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// the EOFException will get us out of this...
while (true) {
byte mappingByte = inputView.readByte();
- ColumnFamilyHandle handle = getColumnFamily(columnFamilyMapping.get(mappingByte));
+ ColumnFamilyHandle handle = getColumnFamily(columnFamilyMapping.get(mappingByte), null);
byte[] keyAndNamespace = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
ByteArrayInputStreamWithPos bis = new ByteArrayInputStreamWithPos(keyAndNamespace);
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
index 32fa9f3..143945e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingStateDescriptor.java
@@ -138,4 +138,9 @@ public class FoldingStateDescriptor<T, ACC> extends StateDescriptor<FoldingState
", foldFunction=" + foldFunction +
'}';
}
+
+ @Override
+ public Type getType() {
+ return Type.FOLDING;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
index 077109c..6861a07 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
@@ -101,4 +101,9 @@ public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, T> {
"serializer=" + serializer +
'}';
}
+
+ @Override
+ public Type getType() {
+ return Type.LIST;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
index 8d79da4..a1d4225 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ReducingStateDescriptor.java
@@ -126,4 +126,9 @@ public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>
", reduceFunction=" + reduceFunction +
'}';
}
+
+ @Override
+ public Type getType() {
+ return Type.REDUCING;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 483c954..de3cd4e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -48,6 +48,11 @@ import static java.util.Objects.requireNonNull;
*/
@PublicEvolving
public abstract class StateDescriptor<S extends State, T> implements Serializable {
+
+ public enum Type {
+ VALUE, LIST, REDUCING, FOLDING, @Deprecated UNKNOWN
+ }
+
private static final long serialVersionUID = 1L;
/** Name that uniquely identifies state created from this StateDescriptor. */
@@ -267,6 +272,8 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
'}';
}
+ public abstract Type getType();
+
// ------------------------------------------------------------------------
// Serialization
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
index 10bcd58..7db9116 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/ValueStateDescriptor.java
@@ -110,4 +110,9 @@ public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
", serializer=" + serializer +
'}';
}
+
+ @Override
+ public Type getType() {
+ return Type.VALUE;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 5e81db7..ac7fbc8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -18,13 +18,13 @@
package org.apache.flink.api.common.typeutils;
-import java.io.IOException;
-import java.io.Serializable;
-
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import java.io.IOException;
+import java.io.Serializable;
+
/**
* This interface describes the methods that are required for a data type to be handled by the Flink
* runtime. Specifically, this interface contains the serialization and copying methods.
@@ -160,4 +160,8 @@ public abstract class TypeSerializer<T> implements Serializable {
public abstract boolean canEqual(Object obj);
public abstract int hashCode();
+
+ public boolean isCompatibleWith(TypeSerializer<?> other) {
+ return equals(other);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
new file mode 100644
index 0000000..06ad8bf
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+public class TypeSerializerSerializationProxy<T> extends VersionedIOReadableWritable {
+
+ public static final int VERSION = 1;
+ private static final Logger LOG = LoggerFactory.getLogger(TypeSerializerSerializationProxy.class);
+
+ private ClassLoader userClassLoader;
+
+ private TypeSerializer<T> typeSerializer;
+
+ private boolean ignoreClassNotFound;
+
+ public TypeSerializerSerializationProxy(ClassLoader userClassLoader, boolean ignoreClassNotFound) {
+ this.userClassLoader = userClassLoader;
+ this.ignoreClassNotFound = ignoreClassNotFound;
+ }
+
+ public TypeSerializerSerializationProxy(ClassLoader userClassLoader) {
+ this(userClassLoader, false);
+ }
+
+ public TypeSerializerSerializationProxy(TypeSerializer<T> typeSerializer) {
+ this.typeSerializer = Preconditions.checkNotNull(typeSerializer);
+ this.ignoreClassNotFound = false;
+ }
+
+ public TypeSerializer<T> getTypeSerializer() {
+ return typeSerializer;
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ super.write(out);
+
+ if (typeSerializer instanceof ClassNotFoundDummyTypeSerializer) {
+ ClassNotFoundDummyTypeSerializer<T> dummyTypeSerializer =
+ (ClassNotFoundDummyTypeSerializer<T>) this.typeSerializer;
+
+ byte[] serializerBytes = dummyTypeSerializer.getActualBytes();
+ out.write(serializerBytes.length);
+ out.write(serializerBytes);
+ } else {
+ // write in a way that allows the stream to recover from exceptions
+ try (ByteArrayOutputStreamWithPos streamWithPos = new ByteArrayOutputStreamWithPos()) {
+ InstantiationUtil.serializeObject(streamWithPos, typeSerializer);
+ out.writeInt(streamWithPos.getPosition());
+ out.write(streamWithPos.getBuf(), 0, streamWithPos.getPosition());
+ }
+ }
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ super.read(in);
+
+ // read in a way that allows the stream to recover from exceptions
+ int serializerBytes = in.readInt();
+ byte[] buffer = new byte[serializerBytes];
+ in.read(buffer);
+ try {
+ typeSerializer = InstantiationUtil.deserializeObject(buffer, userClassLoader);
+ } catch (ClassNotFoundException e) {
+ if (ignoreClassNotFound) {
+ // we create a dummy so that all the information is not lost when we get a new checkpoint before receiving
+ // a proper typeserializer from the user
+ typeSerializer =
+ new ClassNotFoundDummyTypeSerializer<>(buffer);
+ LOG.warn("Could not find requested TypeSerializer class in classpath. Created dummy.", e);
+ } else {
+ throw new IOException("Missing class for type serializer.", e);
+ }
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TypeSerializerSerializationProxy<?> that = (TypeSerializerSerializationProxy<?>) o;
+
+ return getTypeSerializer() != null ? getTypeSerializer().equals(that.getTypeSerializer()) : that.getTypeSerializer() == null;
+ }
+
+ @Override
+ public int hashCode() {
+ return getTypeSerializer() != null ? getTypeSerializer().hashCode() : 0;
+ }
+
+ public boolean isIgnoreClassNotFound() {
+ return ignoreClassNotFound;
+ }
+
+ public void setIgnoreClassNotFound(boolean ignoreClassNotFound) {
+ this.ignoreClassNotFound = ignoreClassNotFound;
+ }
+
+ /**
+ * Dummy TypeSerializer to avoid that data is lost when checkpointing again a serializer for which we encountered
+ * a {@link ClassNotFoundException}.
+ */
+ static final class ClassNotFoundDummyTypeSerializer<T> extends TypeSerializer<T> {
+
+ private static final long serialVersionUID = 2526330533671642711L;
+ private final byte[] actualBytes;
+
+ public ClassNotFoundDummyTypeSerializer(byte[] actualBytes) {
+ this.actualBytes = Preconditions.checkNotNull(actualBytes);
+ }
+
+ public byte[] getActualBytes() {
+ return actualBytes;
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public TypeSerializer<T> duplicate() {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public T createInstance() {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public T copy(T from) {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public T copy(T from, T reuse) {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public int getLength() {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public void serialize(T record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public T deserialize(DataInputView source) throws IOException {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public T deserialize(T reuse, DataInputView source) throws IOException {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return false;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ClassNotFoundDummyTypeSerializer<?> that = (ClassNotFoundDummyTypeSerializer<?>) o;
+
+ return Arrays.equals(getActualBytes(), that.getActualBytes());
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(getActualBytes());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/core/io/VersionMismatchException.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/VersionMismatchException.java b/flink-core/src/main/java/org/apache/flink/core/io/VersionMismatchException.java
new file mode 100644
index 0000000..3ff88e9
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/io/VersionMismatchException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import java.io.IOException;
+
+/**
+ * This exception signals that incompatible versions have been found during serialization.
+ */
+public class VersionMismatchException extends IOException {
+
+ private static final long serialVersionUID = 7024258967585372438L;
+
+ public VersionMismatchException() {
+ }
+
+ public VersionMismatchException(String message) {
+ super(message);
+ }
+
+ public VersionMismatchException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public VersionMismatchException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java b/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
new file mode 100644
index 0000000..94c2722
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/io/VersionedIOReadableWritable.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * This is the abstract base class for {@link IOReadableWritable} which allows to differentiate between serialization
+ * versions. Concrete subclasses should typically override the {@link #write(DataOutputView)} and
+ * {@link #read(DataInputView)}, thereby calling super to ensure version checking.
+ */
+public abstract class VersionedIOReadableWritable implements IOReadableWritable, Versioned {
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeInt(getVersion());
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ int foundVersion = in.readInt();
+ resolveVersionRead(foundVersion);
+ }
+
+ /**
+ * This method is a hook to react on the version tag that we find during read. This can also be used to initialize
+ * further read logic w.r.t. the version at hand.
+ * Default implementation of this method just checks the compatibility of a version number against the own version.
+ *
+ * @param foundVersion the version found from reading the input stream
+ * @throws VersionMismatchException thrown when serialization versions mismatch
+ */
+ protected void resolveVersionRead(int foundVersion) throws VersionMismatchException {
+ if (!isCompatibleVersion(foundVersion)) {
+ long expectedVersion = getVersion();
+ throw new VersionMismatchException(
+ "Incompatible version: found " + foundVersion + ", required " + expectedVersion);
+ }
+ }
+
+ /**
+ * Checks for compatibility between this and the found version. Subclasses can override this methods in case of
+ * intended backwards backwards compatibility.
+ *
+ * @param version version number to compare against.
+ * @return true, iff this is compatible to the passed version.
+ */
+ public boolean isCompatibleVersion(int version) {
+ return getVersion() == version;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
index c25f491..46b82c7 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
@@ -115,4 +115,8 @@ public class ByteArrayInputStreamWithPos extends InputStream {
public int getPosition() {
return position;
}
+
+ public void setPos(int pos) {
+ this.position = pos;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
index df5b34a..ebaf1b9 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
@@ -114,4 +114,8 @@ public class ByteArrayOutputStreamWithPos extends OutputStream {
@Override
public void close() throws IOException {
}
+
+ public byte[] getBuf() {
+ return buffer;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
new file mode 100644
index 0000000..982e7ff
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxyTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static org.junit.Assert.fail;
+
+public class TypeSerializerSerializationProxyTest {
+
+ @Test
+ public void testStateSerializerSerializationProxy() throws Exception {
+
+ TypeSerializer<?> serializer = IntSerializer.INSTANCE;
+
+ TypeSerializerSerializationProxy<?> proxy = new TypeSerializerSerializationProxy<>(serializer);
+
+ byte[] serialized;
+ try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+ proxy.write(new DataOutputViewStreamWrapper(out));
+ serialized = out.toByteArray();
+ }
+
+ proxy = new TypeSerializerSerializationProxy<>(Thread.currentThread().getContextClassLoader());
+
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ proxy.read(new DataInputViewStreamWrapper(in));
+ }
+
+ Assert.assertEquals(serializer, proxy.getTypeSerializer());
+ }
+
+ @Test
+ public void testStateSerializerSerializationProxyClassNotFound() throws Exception {
+
+ TypeSerializer<?> serializer = IntSerializer.INSTANCE;
+
+ TypeSerializerSerializationProxy<?> proxy = new TypeSerializerSerializationProxy<>(serializer);
+
+ byte[] serialized;
+ try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+ proxy.write(new DataOutputViewStreamWrapper(out));
+ serialized = out.toByteArray();
+ }
+
+ proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null));
+
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ proxy.read(new DataInputViewStreamWrapper(in));
+ fail("ClassNotFoundException expected, leading to IOException");
+ } catch (IOException expected) {
+
+ }
+
+ proxy = new TypeSerializerSerializationProxy<>(new URLClassLoader(new URL[0], null), true);
+
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ proxy.read(new DataInputViewStreamWrapper(in));
+ }
+
+ Assert.assertTrue(proxy.getTypeSerializer() instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer);
+
+ Assert.assertArrayEquals(
+ InstantiationUtil.serializeObject(serializer),
+ ((TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer<?>) proxy.getTypeSerializer()).getActualBytes());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java b/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java
new file mode 100644
index 0000000..b7b6d6f
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/core/io/VersionedIOWriteableTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class VersionedIOWriteableTest {
+
+ @Test
+ public void testReadSameVersion() throws Exception {
+
+ String payload = "test";
+
+ TestWriteable testWriteable = new TestWriteable(1, payload);
+ byte[] serialized;
+ try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+ testWriteable.write(new DataOutputViewStreamWrapper(out));
+ serialized = out.toByteArray();
+ }
+
+ testWriteable = new TestWriteable(1);
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ testWriteable.read(new DataInputViewStreamWrapper(in));
+ }
+
+ Assert.assertEquals(payload, testWriteable.getData());
+ }
+
+ @Test
+ public void testReadCompatibleVersion() throws Exception {
+
+ String payload = "test";
+
+ TestWriteable testWriteable = new TestWriteable(1, payload);
+ byte[] serialized;
+ try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+ testWriteable.write(new DataOutputViewStreamWrapper(out));
+ serialized = out.toByteArray();
+ }
+
+ testWriteable = new TestWriteable(2) {
+ @Override
+ public boolean isCompatibleVersion(int version) {
+ return getVersion() >= version;
+ }
+ };
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ testWriteable.read(new DataInputViewStreamWrapper(in));
+ }
+
+ Assert.assertEquals(payload, testWriteable.getData());
+ }
+
+ @Test
+ public void testReadMismatchVersion() throws Exception {
+
+ String payload = "test";
+
+ TestWriteable testWriteable = new TestWriteable(1, payload);
+ byte[] serialized;
+ try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+ testWriteable.write(new DataOutputViewStreamWrapper(out));
+ serialized = out.toByteArray();
+ }
+
+ testWriteable = new TestWriteable(2);
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ testWriteable.read(new DataInputViewStreamWrapper(in));
+ Assert.fail("Version mismatch expected.");
+ } catch (VersionMismatchException ignored) {
+
+ }
+
+ Assert.assertEquals(null, testWriteable.getData());
+ }
+
+ static class TestWriteable extends VersionedIOReadableWritable {
+
+ private final int version;
+ private String data;
+
+ public TestWriteable(int version) {
+ this(version, null);
+ }
+
+ public TestWriteable(int version, String data) {
+ this.version = version;
+ this.data = data;
+ }
+
+ @Override
+ public int getVersion() {
+ return version;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ super.write(out);
+ out.writeUTF(data);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ super.read(in);
+ this.data = in.readUTF();
+ }
+
+ @Override
+ protected void resolveVersionRead(int foundVersion) throws VersionMismatchException {
+ super.resolveVersionRead(foundVersion);
+ }
+
+ @Override
+ public boolean isCompatibleVersion(int version) {
+ return super.isCompatibleVersion(version);
+ }
+
+ public String getData() {
+ return data;
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 5b47362..d7a10d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state;
+import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -52,6 +53,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
private final Collection<OperatorStateHandle> restoreSnapshots;
private final CloseableRegistry closeStreamOnCancelRegistry;
private final JavaSerializer<Serializable> javaSerializer;
+ private final ClassLoader userClassloader;
/**
* Restores a OperatorStateStore (lazily) using the provided snapshots.
@@ -60,21 +62,23 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
*/
public DefaultOperatorStateBackend(
ClassLoader userClassLoader,
- Collection<OperatorStateHandle> restoreSnapshots) {
+ Collection<OperatorStateHandle> restoreSnapshots) throws IOException {
- Preconditions.checkNotNull(userClassLoader);
- this.javaSerializer = new JavaSerializer<>(userClassLoader);
- this.restoreSnapshots = restoreSnapshots;
+ this.userClassloader = Preconditions.checkNotNull(userClassLoader);
+ this.javaSerializer = new JavaSerializer<>();
this.registeredStates = new HashMap<>();
this.closeStreamOnCancelRegistry = new CloseableRegistry();
+ this.restoreSnapshots = restoreSnapshots;
+ restoreState();
}
/**
* Creates an empty OperatorStateStore.
*/
- public DefaultOperatorStateBackend(ClassLoader userClassLoader) {
+ public DefaultOperatorStateBackend(ClassLoader userClassLoader) throws IOException {
this(userClassLoader, null);
}
+
@SuppressWarnings("unchecked")
@Override
public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception {
@@ -82,8 +86,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
}
@Override
- public <S> ListState<S> getOperatorState(
- ListStateDescriptor<S> stateDescriptor) throws IOException {
+ public <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws IOException {
Preconditions.checkNotNull(stateDescriptor);
@@ -95,41 +98,93 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
if (null == partitionableListState) {
- partitionableListState = new PartitionableListState<>(partitionStateSerializer);
-
+ partitionableListState = new PartitionableListState<>(name, partitionStateSerializer);
registeredStates.put(name, partitionableListState);
+ } else {
+ Preconditions.checkState(
+ partitionableListState.getPartitionStateSerializer().
+ isCompatibleWith(stateDescriptor.getSerializer()),
+ "Incompatible type serializers. Provided: " + stateDescriptor.getSerializer() +
+ ", found: " + partitionableListState.getPartitionStateSerializer());
+ }
+
+ return partitionableListState;
+ }
+
+ private void restoreState() throws IOException {
+
+ if (null == restoreSnapshots) {
+ return;
+ }
- // Try to restore previous state if state handles to snapshots are provided
- if (restoreSnapshots != null) {
- for (OperatorStateHandle stateHandle : restoreSnapshots) {
- //TODO we coud be even more gc friendly be removing handles from the collections one the map is empty
- // search and remove to be gc friendly
- long[] offsets = stateHandle.getStateNameToPartitionOffsets().remove(name);
+ for (OperatorStateHandle stateHandle : restoreSnapshots) {
- if (offsets != null) {
+ if (stateHandle == null) {
+ continue;
+ }
+
+ FSDataInputStream in = stateHandle.openInputStream();
+ closeStreamOnCancelRegistry.registerClosable(in);
+
+ ClassLoader restoreClassLoader = Thread.currentThread().getContextClassLoader();
- FSDataInputStream in = stateHandle.openInputStream();
- try {
- closeStreamOnCancelRegistry.registerClosable(in);
+ try {
+ Thread.currentThread().setContextClassLoader(userClassloader);
+ OperatorBackendSerializationProxy backendSerializationProxy =
+ new OperatorBackendSerializationProxy(userClassloader);
- DataInputView div = new DataInputViewStreamWrapper(in);
+ backendSerializationProxy.read(new DataInputViewStreamWrapper(in));
- for (int i = 0; i < offsets.length; ++i) {
+ List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
+ backendSerializationProxy.getNamedStateSerializationProxies();
- in.seek(offsets[i]);
- S partitionState = partitionStateSerializer.deserialize(div);
- partitionableListState.add(partitionState);
- }
- } finally {
- closeStreamOnCancelRegistry.unregisterClosable(in);
- in.close();
- }
+ // Recreate all PartitionableListStates from the meta info
+ for (OperatorBackendSerializationProxy.StateMetaInfo<?> stateMetaInfo : metaInfoList) {
+ PartitionableListState<?> listState = registeredStates.get(stateMetaInfo.getName());
+
+ if (null == listState) {
+ listState = new PartitionableListState<>(
+ stateMetaInfo.getName(),
+ stateMetaInfo.getStateSerializer());
+
+ registeredStates.put(listState.getName(), listState);
+ } else {
+ Preconditions.checkState(listState.getPartitionStateSerializer().isCompatibleWith(
+ stateMetaInfo.getStateSerializer()), "Incompatible state serializers found: " +
+ listState.getPartitionStateSerializer() + " is not compatible with " +
+ stateMetaInfo.getStateSerializer());
}
}
+
+ // Restore all the state in PartitionableListStates
+ for (Map.Entry<String, long[]> nameToOffsets : stateHandle.getStateNameToPartitionOffsets().entrySet()) {
+ PartitionableListState<?> stateListForName = registeredStates.get(nameToOffsets.getKey());
+
+ Preconditions.checkState(null != stateListForName, "Found state without " +
+ "corresponding meta info: " + nameToOffsets.getKey());
+
+ deserializeStateValues(stateListForName, in, nameToOffsets.getValue());
+ }
+
+ } finally {
+ Thread.currentThread().setContextClassLoader(restoreClassLoader);
+ closeStreamOnCancelRegistry.unregisterClosable(in);
+ IOUtils.closeQuietly(in);
}
}
+ }
- return partitionableListState;
+ private static <S> void deserializeStateValues(
+ PartitionableListState<S> stateListForName,
+ FSDataInputStream in,
+ long[] offsets) throws IOException {
+
+ DataInputView div = new DataInputViewStreamWrapper(in);
+ TypeSerializer<S> serializer = stateListForName.getPartitionStateSerializer();
+ for (long offset : offsets) {
+ in.seek(offset);
+ stateListForName.add(serializer.deserialize(div));
+ }
}
@Override
@@ -140,6 +195,18 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
return new DoneFuture<>(null);
}
+ List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
+ new ArrayList<>(registeredStates.size());
+
+ for (Map.Entry<String, PartitionableListState<?>> entry : registeredStates.entrySet()) {
+ PartitionableListState<?> state = entry.getValue();
+ OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo =
+ new OperatorBackendSerializationProxy.StateMetaInfo<>(
+ state.getName(),
+ state.getPartitionStateSerializer());
+ metaInfoList.add(metaInfo);
+ }
+
Map<String, long[]> writtenStatesMetaData = new HashMap<>(registeredStates.size());
CheckpointStreamFactory.CheckpointStateOutputStream out = streamFactory.
@@ -150,6 +217,11 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
DataOutputView dov = new DataOutputViewStreamWrapper(out);
+ OperatorBackendSerializationProxy backendSerializationProxy =
+ new OperatorBackendSerializationProxy(metaInfoList);
+
+ backendSerializationProxy.write(dov);
+
dov.writeInt(registeredStates.size());
for (Map.Entry<String, PartitionableListState<?>> entry : registeredStates.entrySet()) {
@@ -171,29 +243,26 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
registeredStates.clear();
}
+ @Override
+ public Set<String> getRegisteredStateNames() {
+ return registeredStates.keySet();
+ }
+
+ @Override
+ public void close() throws IOException {
+ closeStreamOnCancelRegistry.close();
+ }
+
static final class PartitionableListState<S> implements ListState<S> {
private final List<S> internalList;
+ private final String name;
private final TypeSerializer<S> partitionStateSerializer;
- public PartitionableListState(TypeSerializer<S> partitionStateSerializer) {
+ public PartitionableListState(String name, TypeSerializer<S> partitionStateSerializer) {
this.internalList = new ArrayList<>();
this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer);
- }
-
- @Override
- public void clear() {
- internalList.clear();
- }
-
- @Override
- public Iterable<S> get() {
- return internalList;
- }
-
- @Override
- public void add(S value) {
- internalList.add(value);
+ this.name = Preconditions.checkNotNull(name);
}
public long[] write(FSDataOutputStream out) throws IOException {
@@ -215,6 +284,29 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
return internalList;
}
+ public String getName() {
+ return name;
+ }
+
+ public TypeSerializer<S> getPartitionStateSerializer() {
+ return partitionStateSerializer;
+ }
+
+ @Override
+ public void clear() {
+ internalList.clear();
+ }
+
+ @Override
+ public Iterable<S> get() {
+ return internalList;
+ }
+
+ @Override
+ public void add(S value) {
+ internalList.add(value);
+ }
+
@Override
public String toString() {
return "PartitionableListState{" +
@@ -222,16 +314,5 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
'}';
}
}
-
- @Override
- public Set<String> getRegisteredStateNames() {
- return registeredStates.keySet();
- }
-
- @Override
- public void close() throws IOException {
- closeStreamOnCancelRegistry.close();
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
index 2eb9595..512baf6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/JavaSerializer.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.Serializable;
@@ -34,15 +33,7 @@ import java.io.Serializable;
@Internal
final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
- private final ClassLoader userClassLoader;
-
- public JavaSerializer() {
- this(Thread.currentThread().getContextClassLoader());
- }
-
- public JavaSerializer(ClassLoader userClassLoader) {
- this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
- }
+ private static final long serialVersionUID = 5067491650263321234L;
@Override
public boolean isImmutableType() {
@@ -87,7 +78,8 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
@Override
public T deserialize(DataInputView source) throws IOException {
try {
- return InstantiationUtil.deserializeObject(new DataInputViewStream(source), userClassLoader);
+ return InstantiationUtil.deserializeObject(
+ new DataInputViewStream(source), Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IOException("Could not deserialize object.", e);
}
@@ -107,7 +99,7 @@ final class JavaSerializer<T extends Serializable> extends TypeSerializer<T> {
@Override
public boolean equals(Object obj) {
- return obj instanceof JavaSerializer && userClassLoader.equals(((JavaSerializer<T>) obj).userClassLoader);
+ return obj instanceof JavaSerializer;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
new file mode 100644
index 0000000..dbee6cb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Serialization proxy for all meta data in keyed state backends. In the future we might also migrate the actual state
+ * serialization logic here.
+ */
+public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable {
+
+ private static final int VERSION = 1;
+
+ private TypeSerializerSerializationProxy<?> keySerializerProxy;
+ private List<StateMetaInfo<?, ?>> namedStateSerializationProxies;
+
+ private ClassLoader userCodeClassLoader;
+
+ public KeyedBackendSerializationProxy(ClassLoader userCodeClassLoader) {
+ this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
+ }
+
+ public KeyedBackendSerializationProxy(TypeSerializer<?> keySerializer, List<StateMetaInfo<?, ?>> namedStateSerializationProxies) {
+ this.keySerializerProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(keySerializer));
+ this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies);
+ Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE);
+ }
+
+ public List<StateMetaInfo<?, ?>> getNamedStateSerializationProxies() {
+ return namedStateSerializationProxies;
+ }
+
+ public TypeSerializerSerializationProxy<?> getKeySerializerProxy() {
+ return keySerializerProxy;
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ super.write(out);
+
+ keySerializerProxy.write(out);
+
+ out.writeShort(namedStateSerializationProxies.size());
+
+ for (StateMetaInfo<?, ?> kvState : namedStateSerializationProxies) {
+ kvState.write(out);
+ }
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ super.read(in);
+
+ keySerializerProxy = new TypeSerializerSerializationProxy<>(userCodeClassLoader);
+ keySerializerProxy.read(in);
+
+ int numKvStates = in.readShort();
+ namedStateSerializationProxies = new ArrayList<>(numKvStates);
+ for (int i = 0; i < numKvStates; ++i) {
+ StateMetaInfo<?, ?> stateSerializationProxy = new StateMetaInfo<>(userCodeClassLoader);
+ stateSerializationProxy.read(in);
+ namedStateSerializationProxies.add(stateSerializationProxy);
+ }
+ }
+
+//----------------------------------------------------------------------------------------------------------------------
+
+ /**
+ * This is the serialization proxy for {@link RegisteredBackendStateMetaInfo} for a single registered state in a
+ * keyed backend.
+ */
+ public static class StateMetaInfo<N, S> implements IOReadableWritable {
+
+ private StateDescriptor.Type stateType;
+ private String stateName;
+ private TypeSerializerSerializationProxy<N> namespaceSerializerSerializationProxy;
+ private TypeSerializerSerializationProxy<S> stateSerializerSerializationProxy;
+
+ private ClassLoader userClassLoader;
+
+ StateMetaInfo(ClassLoader userClassLoader) {
+ this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
+ }
+
+ public StateMetaInfo(
+ StateDescriptor.Type stateType,
+ String name,
+ TypeSerializer<N> namespaceSerializer,
+ TypeSerializer<S> stateSerializer) {
+
+ this.stateType = Preconditions.checkNotNull(stateType);
+ this.stateName = Preconditions.checkNotNull(name);
+ this.namespaceSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(namespaceSerializer));
+ this.stateSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(Preconditions.checkNotNull(stateSerializer));
+ }
+
+ public StateDescriptor.Type getStateType() {
+ return stateType;
+ }
+
+ public void setStateType(StateDescriptor.Type stateType) {
+ this.stateType = stateType;
+ }
+
+ public String getStateName() {
+ return stateName;
+ }
+
+ public void setStateName(String stateName) {
+ this.stateName = stateName;
+ }
+
+ public TypeSerializerSerializationProxy<N> getNamespaceSerializerSerializationProxy() {
+ return namespaceSerializerSerializationProxy;
+ }
+
+ public void setNamespaceSerializerSerializationProxy(TypeSerializerSerializationProxy<N> namespaceSerializerSerializationProxy) {
+ this.namespaceSerializerSerializationProxy = namespaceSerializerSerializationProxy;
+ }
+
+ public TypeSerializerSerializationProxy<S> getStateSerializerSerializationProxy() {
+ return stateSerializerSerializationProxy;
+ }
+
+ public void setStateSerializerSerializationProxy(TypeSerializerSerializationProxy<S> stateSerializerSerializationProxy) {
+ this.stateSerializerSerializationProxy = stateSerializerSerializationProxy;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeInt(getStateType().ordinal());
+ out.writeUTF(getStateName());
+
+ getNamespaceSerializerSerializationProxy().write(out);
+ getStateSerializerSerializationProxy().write(out);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ int enumOrdinal = in.readInt();
+ setStateType(StateDescriptor.Type.values()[enumOrdinal]);
+ setStateName(in.readUTF());
+
+ namespaceSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(userClassLoader);
+ namespaceSerializerSerializationProxy.read(in);
+
+ stateSerializerSerializationProxy = new TypeSerializerSerializationProxy<>(userClassLoader);
+ stateSerializerSerializationProxy.read(in);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ StateMetaInfo<?, ?> that = (StateMetaInfo<?, ?>) o;
+
+ if (!getStateName().equals(that.getStateName())) {
+ return false;
+ }
+
+ if (!getNamespaceSerializerSerializationProxy().equals(that.getNamespaceSerializerSerializationProxy())) {
+ return false;
+ }
+
+ return getStateSerializerSerializationProxy().equals(that.getStateSerializerSerializationProxy());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getStateName().hashCode();
+ result = 31 * result + getNamespaceSerializerSerializationProxy().hashCode();
+ result = 31 * result + getStateSerializerSerializationProxy().hashCode();
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
new file mode 100644
index 0000000..61df979
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.io.VersionedIOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Serialization proxy for all meta data in operator state backends. In the future we might also migrate the actual state
+ * serialization logic here.
+ */
+public class OperatorBackendSerializationProxy extends VersionedIOReadableWritable {
+
+ private static final int VERSION = 1;
+
+ private List<StateMetaInfo<?>> namedStateSerializationProxies;
+ private ClassLoader userCodeClassLoader;
+
+ public OperatorBackendSerializationProxy(ClassLoader userCodeClassLoader) {
+ this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
+ }
+
+ public OperatorBackendSerializationProxy(List<StateMetaInfo<?>> namedStateSerializationProxies) {
+ this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies);
+ Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE);
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ super.write(out);
+
+ out.writeShort(namedStateSerializationProxies.size());
+
+ for (StateMetaInfo<?> kvState : namedStateSerializationProxies) {
+ kvState.write(out);
+ }
+ }
+
+ @Override
+ public void read(DataInputView out) throws IOException {
+ super.read(out);
+
+ int numKvStates = out.readShort();
+ namedStateSerializationProxies = new ArrayList<>(numKvStates);
+ for (int i = 0; i < numKvStates; ++i) {
+ StateMetaInfo<?> stateSerializationProxy = new StateMetaInfo<>(userCodeClassLoader);
+ stateSerializationProxy.read(out);
+ namedStateSerializationProxies.add(stateSerializationProxy);
+ }
+ }
+
+ public List<StateMetaInfo<?>> getNamedStateSerializationProxies() {
+ return namedStateSerializationProxies;
+ }
+
+ //----------------------------------------------------------------------------------------------------------------------
+
+ public static class StateMetaInfo<S> implements IOReadableWritable {
+
+ private String name;
+ private TypeSerializer<S> stateSerializer;
+ private ClassLoader userClassLoader;
+
+ private StateMetaInfo(ClassLoader userClassLoader) {
+ this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
+ }
+
+ public StateMetaInfo(String name, TypeSerializer<S> stateSerializer) {
+ this.name = Preconditions.checkNotNull(name);
+ this.stateSerializer = Preconditions.checkNotNull(stateSerializer);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public TypeSerializer<S> getStateSerializer() {
+ return stateSerializer;
+ }
+
+ public void setStateSerializer(TypeSerializer<S> stateSerializer) {
+ this.stateSerializer = stateSerializer;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ out.writeUTF(getName());
+ DataOutputViewStream dos = new DataOutputViewStream(out);
+ InstantiationUtil.serializeObject(dos, getStateSerializer());
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ setName(in.readUTF());
+ DataInputViewStream dis = new DataInputViewStream(in);
+ try {
+ TypeSerializer<S> stateSerializer = InstantiationUtil.deserializeObject(dis, userClassLoader);
+ setStateSerializer(stateSerializer);
+ } catch (ClassNotFoundException exception) {
+ throw new IOException(exception);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
new file mode 100644
index 0000000..62418c3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Compound meta information for a registered state in a keyed state backend. This combines all serializers and the
+ * state name.
+ *
+ * @param <N> Type of namespace
+ * @param <S> Type of state value
+ */
+public class RegisteredBackendStateMetaInfo<N, S> {
+
+ private final StateDescriptor.Type stateType;
+ private final String name;
+ private final TypeSerializer<N> namespaceSerializer;
+ private final TypeSerializer<S> stateSerializer;
+
+ public RegisteredBackendStateMetaInfo(KeyedBackendSerializationProxy.StateMetaInfo<N, S> metaInfoProxy) {
+ this.stateType = metaInfoProxy.getStateType();
+ this.name = metaInfoProxy.getStateName();
+ this.namespaceSerializer = metaInfoProxy.getNamespaceSerializerSerializationProxy().getTypeSerializer();
+ this.stateSerializer = metaInfoProxy.getStateSerializerSerializationProxy().getTypeSerializer();
+ }
+
+ public RegisteredBackendStateMetaInfo(
+ StateDescriptor.Type stateType,
+ String name,
+ TypeSerializer<N> namespaceSerializer,
+ TypeSerializer<S> stateSerializer) {
+
+ this.stateType = Preconditions.checkNotNull(stateType);
+ this.name = Preconditions.checkNotNull(name);
+ this.namespaceSerializer = namespaceSerializer;
+ this.stateSerializer = stateSerializer;
+ }
+
+ public StateDescriptor.Type getStateType() {
+ return stateType;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ public TypeSerializer<S> getStateSerializer() {
+ return stateSerializer;
+ }
+
+ public boolean isCompatibleWith(RegisteredBackendStateMetaInfo<?, ?> other) {
+
+ if (this == other) {
+ return true;
+ }
+
+ if (null == other) {
+ return false;
+ }
+
+ if (!stateType.equals(StateDescriptor.Type.UNKNOWN)
+ && !other.stateType.equals(StateDescriptor.Type.UNKNOWN)
+ && !stateType.equals(other.stateType)) {
+ return false;
+ }
+
+ if (!name.equals(other.getName())) {
+ return false;
+ }
+
+ return namespaceSerializer.isCompatibleWith(other.namespaceSerializer)
+ && stateSerializer.isCompatibleWith(other.stateSerializer);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ RegisteredBackendStateMetaInfo<?, ?> that = (RegisteredBackendStateMetaInfo<?, ?>) o;
+
+ if (!stateType.equals(that.stateType)) {
+ return false;
+ }
+
+ if (!getName().equals(that.getName())) {
+ return false;
+ }
+
+ if (getNamespaceSerializer() != null ? !getNamespaceSerializer().equals(that.getNamespaceSerializer()) : that.getNamespaceSerializer() != null) {
+ return false;
+ }
+ return getStateSerializer() != null ? getStateSerializer().equals(that.getStateSerializer()) : that.getStateSerializer() == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getName().hashCode();
+ result = 31 * result + getStateType().hashCode();
+ result = 31 * result + (getNamespaceSerializer() != null ? getNamespaceSerializer().hashCode() : 0);
+ result = 31 * result + (getStateSerializer() != null ? getStateSerializer().hashCode() : 0);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/21d1d8b4/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 6e85b72..d07901b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -49,6 +50,8 @@ import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
+import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
@@ -61,6 +64,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.RunnableFuture;
@@ -122,53 +126,66 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// ------------------------------------------------------------------------
// state backend operations
// ------------------------------------------------------------------------
+
@SuppressWarnings("unchecked")
- @Override
- public <N, V> ValueState<V> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception {
- StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateDesc.getName());
+ private <N, V> StateTable<K, N, V> tryRegisterStateTable(
+ TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc) {
+
+ String name = stateDesc.getName();
+ StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(name);
+
+ RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
+ new RegisteredBackendStateMetaInfo<>(stateDesc.getType(), name, namespaceSerializer, stateDesc.getSerializer());
+
+ return tryRegisterStateTable(stateTable, newMetaInfo);
+ }
+
+ private <N, V> StateTable<K, N, V> tryRegisterStateTable(
+ StateTable<K, N, V> stateTable, RegisteredBackendStateMetaInfo<N, V> newMetaInfo) {
if (stateTable == null) {
- stateTable = new StateTable<>(stateDesc.getSerializer(), namespaceSerializer, keyGroupRange);
- stateTables.put(stateDesc.getName(), stateTable);
+ stateTable = new StateTable<>(newMetaInfo, keyGroupRange);
+ stateTables.put(newMetaInfo.getName(), stateTable);
+ } else {
+ if (!newMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) {
+ throw new RuntimeException("Trying to access state using incompatible meta info, was " +
+ stateTable.getMetaInfo() + " trying access with " + newMetaInfo);
+ }
+ stateTable.setMetaInfo(newMetaInfo);
}
+ return stateTable;
+ }
+ @SuppressWarnings("unchecked")
+ @Override
+ public <N, V> ValueState<V> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<V> stateDesc) throws Exception {
+ StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
return new HeapValueState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@SuppressWarnings("unchecked")
@Override
public <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception {
- StateTable<K, N, ArrayList<T>> stateTable = (StateTable<K, N, ArrayList<T>>) stateTables.get(stateDesc.getName());
+ String name = stateDesc.getName();
+ StateTable<K, N, ArrayList<T>> stateTable = (StateTable<K, N, ArrayList<T>>) stateTables.get(name);
- if (stateTable == null) {
- stateTable = new StateTable<>(new ArrayListSerializer<>(stateDesc.getSerializer()), namespaceSerializer, keyGroupRange);
- stateTables.put(stateDesc.getName(), stateTable);
- }
+ RegisteredBackendStateMetaInfo<N, ArrayList<T>> newMetaInfo =
+ new RegisteredBackendStateMetaInfo<>(stateDesc.getType(), name, namespaceSerializer, new ArrayListSerializer<>(stateDesc.getSerializer()));
+ stateTable = tryRegisterStateTable(stateTable, newMetaInfo);
return new HeapListState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
}
+
@SuppressWarnings("unchecked")
@Override
public <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception {
- StateTable<K, N, T> stateTable = (StateTable<K, N, T>) stateTables.get(stateDesc.getName());
-
- if (stateTable == null) {
- stateTable = new StateTable<>(stateDesc.getSerializer(), namespaceSerializer, keyGroupRange);
- stateTables.put(stateDesc.getName(), stateTable);
- }
-
+ StateTable<K, N, T> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
return new HeapReducingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@SuppressWarnings("unchecked")
@Override
protected <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
- StateTable<K, N, ACC> stateTable = (StateTable<K, N, ACC>) stateTables.get(stateDesc.getName());
-
- if (stateTable == null) {
- stateTable = new StateTable<>(stateDesc.getSerializer(), namespaceSerializer, keyGroupRange);
- stateTables.put(stateDesc.getName(), stateTable);
- }
-
+ StateTable<K, N, ACC> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);
return new HeapFoldingState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);
}
@@ -192,23 +209,28 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
"Too many KV-States: " + stateTables.size() +
". Currently at most " + Short.MAX_VALUE + " states are supported");
- outView.writeShort(stateTables.size());
+ List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoProxyList = new ArrayList<>(stateTables.size());
Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size());
for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
- outView.writeUTF(kvState.getKey());
-
- TypeSerializer<?> namespaceSerializer = kvState.getValue().getNamespaceSerializer();
- TypeSerializer<?> stateSerializer = kvState.getValue().getStateSerializer();
-
- InstantiationUtil.serializeObject(stream, namespaceSerializer);
- InstantiationUtil.serializeObject(stream, stateSerializer);
+ RegisteredBackendStateMetaInfo<?, ?> metaInfo = kvState.getValue().getMetaInfo();
+ KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(
+ metaInfo.getStateType(),
+ metaInfo.getName(),
+ metaInfo.getNamespaceSerializer(),
+ metaInfo.getStateSerializer());
+ metaInfoProxyList.add(metaInfoProxy);
kVStateToId.put(kvState.getKey(), kVStateToId.size());
}
+ KeyedBackendSerializationProxy serializationProxy =
+ new KeyedBackendSerializationProxy(keySerializer, metaInfoProxyList);
+
+ serializationProxy.write(outView);
+
int offsetCounter = 0;
long[] keyGroupRangeOffsets = new long[keyGroupRange.getNumberOfKeyGroups()];
@@ -278,23 +300,27 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
try {
DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);
- int numKvStates = inView.readShort();
+ KeyedBackendSerializationProxy serializationProxy =
+ new KeyedBackendSerializationProxy(userCodeClassLoader);
- for (int i = 0; i < numKvStates; ++i) {
- String stateName = inView.readUTF();
+ serializationProxy.read(inView);
- TypeSerializer<?> namespaceSerializer =
- InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader);
- TypeSerializer<?> stateSerializer =
- InstantiationUtil.deserializeObject(fsDataInputStream, userCodeClassLoader);
+ List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoList =
+ serializationProxy.getNamedStateSerializationProxies();
- StateTable<K, ?, ?> stateTable = stateTables.get(stateName);
+ for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoSerializationProxy : metaInfoList) {
+
+ StateTable<K, ?, ?> stateTable = stateTables.get(metaInfoSerializationProxy.getStateName());
//important: only create a new table we did not already create it previously
if (null == stateTable) {
- stateTable = new StateTable<>(stateSerializer, namespaceSerializer, keyGroupRange);
- stateTables.put(stateName, stateTable);
- kvStatesById.put(numRegisteredKvStates, stateName);
+
+ RegisteredBackendStateMetaInfo<?, ?> registeredBackendStateMetaInfo =
+ new RegisteredBackendStateMetaInfo<>(metaInfoSerializationProxy);
+
+ stateTable = new StateTable<>(registeredBackendStateMetaInfo, keyGroupRange);
+ stateTables.put(metaInfoSerializationProxy.getStateName(), stateTable);
+ kvStatesById.put(numRegisteredKvStates, metaInfoSerializationProxy.getStateName());
++numRegisteredKvStates;
}
}
@@ -307,7 +333,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
int writtenKeyGroupIndex = inView.readInt();
assert writtenKeyGroupIndex == keyGroupIndex;
- for (int i = 0; i < numKvStates; i++) {
+ for (int i = 0; i < metaInfoList.size(); i++) {
int kvStateId = inView.readShort();
byte isPresent = inView.readByte();
@@ -419,11 +445,18 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
rawResultMap.put(VoidNamespace.INSTANCE, nullNameSpaceFix);
}
- StateTable<K, ?, ?> stateTable = new StateTable<>(stateSerializer, namespaceSerializer, keyGroupRange);
+ RegisteredBackendStateMetaInfo<?, ?> registeredBackendStateMetaInfo =
+ new RegisteredBackendStateMetaInfo<>(
+ StateDescriptor.Type.UNKNOWN,
+ nameToState.getKey(),
+ namespaceSerializer,
+ stateSerializer);
+
+ StateTable<K, ?, ?> stateTable = new StateTable<>(registeredBackendStateMetaInfo, keyGroupRange);
stateTable.getState().set(0, rawResultMap);
// add named state to the backend
- getStateTables().put(nameToState.getKey(), stateTable);
+ getStateTables().put(registeredBackendStateMetaInfo.getName(), stateTable);
}
}