You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/07 19:47:01 UTC
[3/8] flink git commit: [FLINK-6178] [core] Allow serializer upgrades
for managed state
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
index d571dcc..91d7aab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
@@ -18,15 +18,10 @@
package org.apache.flink.runtime.state;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
-import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.io.VersionMismatchException;
import org.apache.flink.core.io.VersionedIOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -34,23 +29,29 @@ import java.util.ArrayList;
import java.util.List;
/**
- * Serialization proxy for all meta data in operator state backends. In the future we might also migrate the actual state
+ * Serialization proxy for all meta data in operator state backends. In the future we might also requiresMigration the actual state
* serialization logic here.
*/
public class OperatorBackendSerializationProxy extends VersionedIOReadableWritable {
- private static final int VERSION = 1;
+ public static final int VERSION = 2;
- private List<StateMetaInfo<?>> namedStateSerializationProxies;
+ private List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> stateMetaInfoSnapshots;
private ClassLoader userCodeClassLoader;
+ private int restoredVersion;
+
public OperatorBackendSerializationProxy(ClassLoader userCodeClassLoader) {
this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
}
- public OperatorBackendSerializationProxy(List<StateMetaInfo<?>> namedStateSerializationProxies) {
- this.namedStateSerializationProxies = Preconditions.checkNotNull(namedStateSerializationProxies);
- Preconditions.checkArgument(namedStateSerializationProxies.size() <= Short.MAX_VALUE);
+ public OperatorBackendSerializationProxy(
+ List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> stateMetaInfoSnapshots) {
+
+ this.stateMetaInfoSnapshots = Preconditions.checkNotNull(stateMetaInfoSnapshots);
+ Preconditions.checkArgument(stateMetaInfoSnapshots.size() <= Short.MAX_VALUE);
+
+ this.restoredVersion = VERSION;
}
@Override
@@ -59,129 +60,44 @@ public class OperatorBackendSerializationProxy extends VersionedIOReadableWritab
}
@Override
- public void write(DataOutputView out) throws IOException {
- super.write(out);
-
- out.writeShort(namedStateSerializationProxies.size());
-
- for (StateMetaInfo<?> kvState : namedStateSerializationProxies) {
- kvState.write(out);
- }
+ protected void resolveVersionRead(int foundVersion) throws VersionMismatchException {
+ super.resolveVersionRead(foundVersion);
+ this.restoredVersion = foundVersion;
}
@Override
- public void read(DataInputView out) throws IOException {
- super.read(out);
-
- int numKvStates = out.readShort();
- namedStateSerializationProxies = new ArrayList<>(numKvStates);
- for (int i = 0; i < numKvStates; ++i) {
- StateMetaInfo<?> stateSerializationProxy = new StateMetaInfo<>(userCodeClassLoader);
- stateSerializationProxy.read(out);
- namedStateSerializationProxies.add(stateSerializationProxy);
- }
+ public boolean isCompatibleVersion(int version) {
+ // we are compatible with version 2 (Flink 1.3.x) and version 1 (Flink 1.2.x)
+ return super.isCompatibleVersion(version) || version == 1;
}
- public List<StateMetaInfo<?>> getNamedStateSerializationProxies() {
- return namedStateSerializationProxies;
- }
-
- //----------------------------------------------------------------------------------------------------------------------
-
- public static class StateMetaInfo<S> implements IOReadableWritable {
-
- private String name;
- private TypeSerializer<S> stateSerializer;
- private OperatorStateHandle.Mode mode;
-
- private ClassLoader userClassLoader;
-
- @VisibleForTesting
- public StateMetaInfo(ClassLoader userClassLoader) {
- this.userClassLoader = Preconditions.checkNotNull(userClassLoader);
- }
-
- public StateMetaInfo(String name, TypeSerializer<S> stateSerializer, OperatorStateHandle.Mode mode) {
- this.name = Preconditions.checkNotNull(name);
- this.stateSerializer = Preconditions.checkNotNull(stateSerializer);
- this.mode = Preconditions.checkNotNull(mode);
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public TypeSerializer<S> getStateSerializer() {
- return stateSerializer;
- }
-
- public void setStateSerializer(TypeSerializer<S> stateSerializer) {
- this.stateSerializer = stateSerializer;
- }
-
- public OperatorStateHandle.Mode getMode() {
- return mode;
- }
-
- public void setMode(OperatorStateHandle.Mode mode) {
- this.mode = mode;
- }
-
- @Override
- public void write(DataOutputView out) throws IOException {
- out.writeUTF(getName());
- out.writeByte(getMode().ordinal());
- DataOutputViewStream dos = new DataOutputViewStream(out);
- InstantiationUtil.serializeObject(dos, getStateSerializer());
- }
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ super.write(out);
- @Override
- public void read(DataInputView in) throws IOException {
- setName(in.readUTF());
- setMode(OperatorStateHandle.Mode.values()[in.readByte()]);
- DataInputViewStream dis = new DataInputViewStream(in);
- try {
- TypeSerializer<S> stateSerializer = InstantiationUtil.deserializeObject(dis, userClassLoader);
- setStateSerializer(stateSerializer);
- } catch (ClassNotFoundException exception) {
- throw new IOException(exception);
- }
+ out.writeShort(stateMetaInfoSnapshots.size());
+ for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> kvState : stateMetaInfoSnapshots) {
+ OperatorBackendStateMetaInfoSnapshotReaderWriters
+ .getWriterForVersion(VERSION, kvState)
+ .writeStateMetaInfo(out);
}
+ }
- @Override
- public boolean equals(Object o) {
-
- if (this == o) {
- return true;
- }
-
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- StateMetaInfo<?> metaInfo = (StateMetaInfo<?>) o;
-
- if (!getName().equals(metaInfo.getName())) {
- return false;
- }
-
- if (!getStateSerializer().equals(metaInfo.getStateSerializer())) {
- return false;
- }
-
- return getMode() == metaInfo.getMode();
+ @Override
+ public void read(DataInputView in) throws IOException {
+ super.read(in);
+
+ int numKvStates = in.readShort();
+ stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
+ for (int i = 0; i < numKvStates; i++) {
+ stateMetaInfoSnapshots.add(
+ OperatorBackendStateMetaInfoSnapshotReaderWriters
+ .getReaderForVersion(restoredVersion, userCodeClassLoader)
+ .readStateMetaInfo(in));
}
+ }
- @Override
- public int hashCode() {
- int result = getName().hashCode();
- result = 31 * result + getStateSerializer().hashCode();
- result = 31 * result + getMode().hashCode();
- return result;
- }
+ public List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> getStateMetaInfoSnapshots() {
+ return stateMetaInfoSnapshots;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
new file mode 100644
index 0000000..9ab106b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
+import org.apache.flink.api.common.typeutils.TypeSerializerUtil;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * Readers and writers for different versions of the {@link RegisteredOperatorBackendStateMetaInfo.Snapshot}.
+ * Outdated formats are also kept here for documentation of history backlog.
+ */
+public class OperatorBackendStateMetaInfoSnapshotReaderWriters {
+
+ // -------------------------------------------------------------------------------
+ // Writers
+ // - v1: Flink 1.2.x
+ // - v2: Flink 1.3.x
+ // -------------------------------------------------------------------------------
+
+ public static <S> OperatorBackendStateMetaInfoWriter getWriterForVersion(
+ int version, RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) {
+
+ switch (version) {
+ case 1:
+ return new OperatorBackendStateMetaInfoWriterV1<>(stateMetaInfo);
+
+ // current version
+ case OperatorBackendSerializationProxy.VERSION:
+ return new OperatorBackendStateMetaInfoWriterV2<>(stateMetaInfo);
+
+ default:
+ // guard for future
+ throw new IllegalStateException(
+ "Unrecognized operator backend state meta info writer version: " + version);
+ }
+ }
+
+ public interface OperatorBackendStateMetaInfoWriter {
+ void writeStateMetaInfo(DataOutputView out) throws IOException;
+ }
+
+ public static abstract class AbstractOperatorBackendStateMetaInfoWriter<S>
+ implements OperatorBackendStateMetaInfoWriter {
+
+ protected final RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo;
+
+ public AbstractOperatorBackendStateMetaInfoWriter(RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) {
+ this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
+ }
+ }
+
+ public static class OperatorBackendStateMetaInfoWriterV1<S> extends AbstractOperatorBackendStateMetaInfoWriter<S> {
+
+ public OperatorBackendStateMetaInfoWriterV1(RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) {
+ super(stateMetaInfo);
+ }
+
+ @Override
+ public void writeStateMetaInfo(DataOutputView out) throws IOException {
+ out.writeUTF(stateMetaInfo.getName());
+ out.writeByte(stateMetaInfo.getAssignmentMode().ordinal());
+ new TypeSerializerSerializationProxy<>(stateMetaInfo.getPartitionStateSerializer()).write(out);
+ }
+ }
+
+ public static class OperatorBackendStateMetaInfoWriterV2<S> extends AbstractOperatorBackendStateMetaInfoWriter<S> {
+
+ public OperatorBackendStateMetaInfoWriterV2(RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo) {
+ super(stateMetaInfo);
+ }
+
+ @Override
+ public void writeStateMetaInfo(DataOutputView out) throws IOException {
+ out.writeUTF(stateMetaInfo.getName());
+ out.writeByte(stateMetaInfo.getAssignmentMode().ordinal());
+
+ // write in a way that allows us to be fault-tolerant and skip blocks in the case of java serialization failures
+ try (
+ ByteArrayOutputStreamWithPos outWithPos = new ByteArrayOutputStreamWithPos();
+ DataOutputViewStreamWrapper outViewWrapper = new DataOutputViewStreamWrapper(outWithPos)) {
+
+ new TypeSerializerSerializationProxy<>(stateMetaInfo.getPartitionStateSerializer()).write(outViewWrapper);
+
+ // write the start offset of the config snapshot
+ out.writeInt(outWithPos.getPosition());
+ TypeSerializerUtil.writeSerializerConfigSnapshot(
+ outViewWrapper,
+ stateMetaInfo.getPartitionStateSerializerConfigSnapshot());
+
+ // write the total number of bytes and flush
+ out.writeInt(outWithPos.getPosition());
+ out.write(outWithPos.getBuf(), 0, outWithPos.getPosition());
+ }
+ }
+ }
+
+ // -------------------------------------------------------------------------------
+ // Readers
+ // - v1: Flink 1.2.x
+ // - v2: Flink 1.3.x
+ // -------------------------------------------------------------------------------
+
+ public static <S> OperatorBackendStateMetaInfoReader<S> getReaderForVersion(
+ int version, ClassLoader userCodeClassLoader) {
+
+ switch (version) {
+ case 1:
+ return new OperatorBackendStateMetaInfoReaderV1<>(userCodeClassLoader);
+
+ // current version
+ case OperatorBackendSerializationProxy.VERSION:
+ return new OperatorBackendStateMetaInfoReaderV2<>(userCodeClassLoader);
+
+ default:
+ // guard for future
+ throw new IllegalStateException(
+ "Unrecognized operator backend state meta info reader version: " + version);
+ }
+ }
+
+ public interface OperatorBackendStateMetaInfoReader<S> {
+ RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readStateMetaInfo(DataInputView in) throws IOException;
+ }
+
+ public static abstract class AbstractOperatorBackendStateMetaInfoReader<S>
+ implements OperatorBackendStateMetaInfoReader<S> {
+
+ protected final ClassLoader userCodeClassLoader;
+
+ public AbstractOperatorBackendStateMetaInfoReader(ClassLoader userCodeClassLoader) {
+ this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
+ }
+ }
+
+ public static class OperatorBackendStateMetaInfoReaderV1<S> extends AbstractOperatorBackendStateMetaInfoReader<S> {
+
+ public OperatorBackendStateMetaInfoReaderV1(ClassLoader userCodeClassLoader) {
+ super(userCodeClassLoader);
+ }
+
+ @Override
+ public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readStateMetaInfo(DataInputView in) throws IOException {
+ RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo =
+ new RegisteredOperatorBackendStateMetaInfo.Snapshot<>();
+
+ stateMetaInfo.setName(in.readUTF());
+ stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]);
+ DataInputViewStream dis = new DataInputViewStream(in);
+ try {
+ TypeSerializer<S> stateSerializer = InstantiationUtil.deserializeObject(dis, userCodeClassLoader);
+ stateMetaInfo.setPartitionStateSerializer(stateSerializer);
+ } catch (ClassNotFoundException exception) {
+ throw new IOException(exception);
+ }
+
+ // old versions do not contain the partition state serializer's configuration snapshot
+ stateMetaInfo.setPartitionStateSerializerConfigSnapshot(null);
+
+ return stateMetaInfo;
+ }
+ }
+
+ public static class OperatorBackendStateMetaInfoReaderV2<S> extends AbstractOperatorBackendStateMetaInfoReader<S> {
+
+ public OperatorBackendStateMetaInfoReaderV2(ClassLoader userCodeClassLoader) {
+ super(userCodeClassLoader);
+ }
+
+ @Override
+ public RegisteredOperatorBackendStateMetaInfo.Snapshot<S> readStateMetaInfo(DataInputView in) throws IOException {
+ RegisteredOperatorBackendStateMetaInfo.Snapshot<S> stateMetaInfo =
+ new RegisteredOperatorBackendStateMetaInfo.Snapshot<>();
+
+ stateMetaInfo.setName(in.readUTF());
+ stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]);
+
+ // read start offset of configuration snapshot
+ int configSnapshotStartOffset = in.readInt();
+
+ int totalBytes = in.readInt();
+
+ byte[] buffer = new byte[totalBytes];
+ in.readFully(buffer);
+
+ ByteArrayInputStreamWithPos inWithPos = new ByteArrayInputStreamWithPos(buffer);
+ DataInputViewStreamWrapper inViewWrapper = new DataInputViewStreamWrapper(inWithPos);
+
+ try {
+ final TypeSerializerSerializationProxy<S> partitionStateSerializerProxy =
+ new TypeSerializerSerializationProxy<>(userCodeClassLoader);
+ partitionStateSerializerProxy.read(inViewWrapper);
+ stateMetaInfo.setPartitionStateSerializer(partitionStateSerializerProxy.getTypeSerializer());
+ } catch (IOException e) {
+ stateMetaInfo.setPartitionStateSerializer(null);
+ }
+
+ // make sure we start from the partition state serializer bytes position
+ inWithPos.setPosition(configSnapshotStartOffset);
+ stateMetaInfo.setPartitionStateSerializerConfigSnapshot(
+ TypeSerializerUtil.readSerializerConfigSnapshot(inViewWrapper, userCodeClassLoader));
+
+ return stateMetaInfo;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
deleted file mode 100644
index 0d4b3c8..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.migration.MigrationNamespaceSerializerProxy;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Compound meta information for a registered state in a keyed state backend. This combines all serializers and the
- * state name.
- *
- * @param <N> Type of namespace
- * @param <S> Type of state value
- */
-public class RegisteredBackendStateMetaInfo<N, S> {
-
- private final StateDescriptor.Type stateType;
- private final String name;
- private final TypeSerializer<N> namespaceSerializer;
- private final TypeSerializer<S> stateSerializer;
-
- public RegisteredBackendStateMetaInfo(KeyedBackendSerializationProxy.StateMetaInfo<N, S> metaInfoProxy) {
- this(
- metaInfoProxy.getStateType(),
- metaInfoProxy.getStateName(),
- metaInfoProxy.getNamespaceSerializerSerializationProxy().getTypeSerializer(),
- metaInfoProxy.getStateSerializerSerializationProxy().getTypeSerializer());
- }
-
- public RegisteredBackendStateMetaInfo(
- StateDescriptor.Type stateType,
- String name,
- TypeSerializer<N> namespaceSerializer,
- TypeSerializer<S> stateSerializer) {
-
- this.stateType = checkNotNull(stateType);
- this.name = checkNotNull(name);
- this.namespaceSerializer = checkNotNull(namespaceSerializer);
- this.stateSerializer = checkNotNull(stateSerializer);
- }
-
- public StateDescriptor.Type getStateType() {
- return stateType;
- }
-
- public String getName() {
- return name;
- }
-
- public TypeSerializer<N> getNamespaceSerializer() {
- return namespaceSerializer;
- }
-
- public TypeSerializer<S> getStateSerializer() {
- return stateSerializer;
- }
-
- public boolean canRestoreFrom(RegisteredBackendStateMetaInfo<?, ?> other) {
-
- if (this == other) {
- return true;
- }
-
- if (null == other) {
- return false;
- }
-
- if (!stateType.equals(StateDescriptor.Type.UNKNOWN)
- && !other.stateType.equals(StateDescriptor.Type.UNKNOWN)
- && !stateType.equals(other.stateType)) {
- return false;
- }
-
- if (!name.equals(other.getName())) {
- return false;
- }
-
- return (stateSerializer.canRestoreFrom(other.stateSerializer)) &&
- (namespaceSerializer.canRestoreFrom(other.namespaceSerializer)
- // we also check if there is just a migration proxy that should be replaced by any real serializer
- || other.namespaceSerializer instanceof MigrationNamespaceSerializerProxy);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- RegisteredBackendStateMetaInfo<?, ?> that = (RegisteredBackendStateMetaInfo<?, ?>) o;
-
- if (!stateType.equals(that.stateType)) {
- return false;
- }
-
- if (!getName().equals(that.getName())) {
- return false;
- }
-
- return getStateSerializer().equals(that.getStateSerializer())
- && getNamespaceSerializer().equals(that.getNamespaceSerializer());
- }
-
- @Override
- public String toString() {
- return "RegisteredBackendStateMetaInfo{" +
- "stateType=" + stateType +
- ", name='" + name + '\'' +
- ", namespaceSerializer=" + namespaceSerializer +
- ", stateSerializer=" + stateSerializer +
- '}';
- }
-
- @Override
- public int hashCode() {
- int result = getName().hashCode();
- result = 31 * result + getStateType().hashCode();
- result = 31 * result + (getNamespaceSerializer() != null ? getNamespaceSerializer().hashCode() : 0);
- result = 31 * result + (getStateSerializer() != null ? getStateSerializer().hashCode() : 0);
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
new file mode 100644
index 0000000..e1a7e06
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredKeyedBackendStateMetaInfo.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Compound meta information for a registered state in a keyed state backend. This combines all serializers and the
+ * state name.
+ *
+ * @param <N> Type of namespace
+ * @param <S> Type of state value
+ */
+public class RegisteredKeyedBackendStateMetaInfo<N, S> {
+
+ private final StateDescriptor.Type stateType;
+ private final String name;
+ private final TypeSerializer<N> namespaceSerializer;
+ private final TypeSerializer<S> stateSerializer;
+
+ public RegisteredKeyedBackendStateMetaInfo(
+ StateDescriptor.Type stateType,
+ String name,
+ TypeSerializer<N> namespaceSerializer,
+ TypeSerializer<S> stateSerializer) {
+
+ this.stateType = checkNotNull(stateType);
+ this.name = checkNotNull(name);
+ this.namespaceSerializer = checkNotNull(namespaceSerializer);
+ this.stateSerializer = checkNotNull(stateSerializer);
+ }
+
+ public StateDescriptor.Type getStateType() {
+ return stateType;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ public TypeSerializer<S> getStateSerializer() {
+ return stateSerializer;
+ }
+
+ public Snapshot<N, S> snapshot() {
+ return new Snapshot<>(
+ stateType,
+ name,
+ namespaceSerializer.duplicate(),
+ stateSerializer.duplicate(),
+ namespaceSerializer.snapshotConfiguration(),
+ stateSerializer.snapshotConfiguration());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ RegisteredKeyedBackendStateMetaInfo<?, ?> that = (RegisteredKeyedBackendStateMetaInfo<?, ?>) o;
+
+ if (!stateType.equals(that.stateType)) {
+ return false;
+ }
+
+ if (!getName().equals(that.getName())) {
+ return false;
+ }
+
+ return getStateSerializer().equals(that.getStateSerializer())
+ && getNamespaceSerializer().equals(that.getNamespaceSerializer());
+ }
+
+ @Override
+ public String toString() {
+ return "RegisteredKeyedBackendStateMetaInfo{" +
+ "stateType=" + stateType +
+ ", name='" + name + '\'' +
+ ", namespaceSerializer=" + namespaceSerializer +
+ ", stateSerializer=" + stateSerializer +
+ '}';
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getName().hashCode();
+ result = 31 * result + getStateType().hashCode();
+ result = 31 * result + getNamespaceSerializer().hashCode();
+ result = 31 * result + getStateSerializer().hashCode();
+ return result;
+ }
+
+ /**
+ * A consistent snapshot of a {@link RegisteredKeyedBackendStateMetaInfo}.
+ */
+ public static class Snapshot<N, S> {
+
+ private StateDescriptor.Type stateType;
+ private String name;
+ private TypeSerializer<N> namespaceSerializer;
+ private TypeSerializer<S> stateSerializer;
+ private TypeSerializerConfigSnapshot namespaceSerializerConfigSnapshot;
+ private TypeSerializerConfigSnapshot stateSerializerConfigSnapshot;
+
+ /** Empty constructor used when restoring the state meta info snapshot. */
+ Snapshot() {}
+
+ private Snapshot(
+ StateDescriptor.Type stateType,
+ String name,
+ TypeSerializer<N> namespaceSerializer,
+ TypeSerializer<S> stateSerializer,
+ TypeSerializerConfigSnapshot namespaceSerializerConfigSnapshot,
+ TypeSerializerConfigSnapshot stateSerializerConfigSnapshot) {
+
+ this.stateType = Preconditions.checkNotNull(stateType);
+ this.name = Preconditions.checkNotNull(name);
+ this.namespaceSerializer = Preconditions.checkNotNull(namespaceSerializer);
+ this.stateSerializer = Preconditions.checkNotNull(stateSerializer);
+ this.namespaceSerializerConfigSnapshot = Preconditions.checkNotNull(namespaceSerializerConfigSnapshot);
+ this.stateSerializerConfigSnapshot = Preconditions.checkNotNull(stateSerializerConfigSnapshot);
+ }
+
+ public StateDescriptor.Type getStateType() {
+ return stateType;
+ }
+
+ void setStateType(StateDescriptor.Type stateType) {
+ this.stateType = stateType;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ void setName(String name) {
+ this.name = name;
+ }
+
+ public TypeSerializer<N> getNamespaceSerializer() {
+ return namespaceSerializer;
+ }
+
+ void setNamespaceSerializer(TypeSerializer<N> namespaceSerializer) {
+ this.namespaceSerializer = namespaceSerializer;
+ }
+
+ public TypeSerializer<S> getStateSerializer() {
+ return stateSerializer;
+ }
+
+ void setStateSerializer(TypeSerializer<S> stateSerializer) {
+ this.stateSerializer = stateSerializer;
+ }
+
+ public TypeSerializerConfigSnapshot getNamespaceSerializerConfigSnapshot() {
+ return namespaceSerializerConfigSnapshot;
+ }
+
+ void setNamespaceSerializerConfigSnapshot(TypeSerializerConfigSnapshot namespaceSerializerConfigSnapshot) {
+ this.namespaceSerializerConfigSnapshot = namespaceSerializerConfigSnapshot;
+ }
+
+ public TypeSerializerConfigSnapshot getStateSerializerConfigSnapshot() {
+ return stateSerializerConfigSnapshot;
+ }
+
+ void setStateSerializerConfigSnapshot(TypeSerializerConfigSnapshot stateSerializerConfigSnapshot) {
+ this.stateSerializerConfigSnapshot = stateSerializerConfigSnapshot;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Snapshot<?, ?> that = (Snapshot<?, ?>) o;
+
+ if (!stateType.equals(that.stateType)) {
+ return false;
+ }
+
+ if (!getName().equals(that.getName())) {
+ return false;
+ }
+
+ // need to check for nulls because serializer and config snapshots may be null on restore
+ return
+ ((getStateSerializer() == null && that.getStateSerializer() == null)
+ || getStateSerializer().equals(that.getStateSerializer()))
+ && ((getNamespaceSerializer() == null && that.getNamespaceSerializer() == null)
+ || getNamespaceSerializer().equals(that.getNamespaceSerializer()))
+ && ((getNamespaceSerializerConfigSnapshot() == null && that.getNamespaceSerializerConfigSnapshot() == null)
+ || getNamespaceSerializerConfigSnapshot().equals(that.getNamespaceSerializerConfigSnapshot()))
+ && ((getStateSerializerConfigSnapshot() == null && that.getStateSerializerConfigSnapshot() == null)
+ || getStateSerializerConfigSnapshot().equals(that.getStateSerializerConfigSnapshot()));
+ }
+
+ @Override
+ public int hashCode() {
+ // need to check for nulls because serializer and config snapshots may be null on restore
+ int result = getName().hashCode();
+ result = 31 * result + getStateType().hashCode();
+ result = 31 * result + (getNamespaceSerializer() != null ? getNamespaceSerializer().hashCode() : 0);
+ result = 31 * result + (getStateSerializer() != null ? getStateSerializer().hashCode() : 0);
+ result = 31 * result + (getNamespaceSerializerConfigSnapshot() != null ? getNamespaceSerializerConfigSnapshot().hashCode() : 0);
+ result = 31 * result + (getStateSerializerConfigSnapshot() != null ? getStateSerializerConfigSnapshot().hashCode() : 0);
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
new file mode 100644
index 0000000..b43fc9c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredOperatorBackendStateMetaInfo.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Compound meta information for a registered state in an operator state backend.
+ * This contains the state name, assignment mode, and state partition serializer.
+ *
+ * @param <S> Type of the state.
+ */
+public class RegisteredOperatorBackendStateMetaInfo<S> {
+
+ /**
+ * The name of the state, as registered by the user
+ */
+ private final String name;
+
+ /**
+ * The mode how elements in this state are assigned to tasks during restore
+ */
+ private final OperatorStateHandle.Mode assignmentMode;
+
+ /**
+ * The type serializer for the elements in the state list
+ */
+ private final TypeSerializer<S> partitionStateSerializer;
+
+ public RegisteredOperatorBackendStateMetaInfo(
+ String name,
+ TypeSerializer<S> partitionStateSerializer,
+ OperatorStateHandle.Mode assignmentMode) {
+
+ this.name = Preconditions.checkNotNull(name);
+ this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer);
+ this.assignmentMode = Preconditions.checkNotNull(assignmentMode);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public OperatorStateHandle.Mode getAssignmentMode() {
+ return assignmentMode;
+ }
+
+ public TypeSerializer<S> getPartitionStateSerializer() {
+ return partitionStateSerializer;
+ }
+
+ public Snapshot<S> snapshot() {
+ return new Snapshot<>(
+ name,
+ assignmentMode,
+ partitionStateSerializer.duplicate(),
+ partitionStateSerializer.snapshotConfiguration());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ return (obj instanceof RegisteredOperatorBackendStateMetaInfo)
+ && name.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getName())
+ && assignmentMode.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getAssignmentMode())
+ && partitionStateSerializer.equals(((RegisteredOperatorBackendStateMetaInfo) obj).getPartitionStateSerializer());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = getName().hashCode();
+ result = 31 * result + getAssignmentMode().hashCode();
+ result = 31 * result + getPartitionStateSerializer().hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "RegisteredOperatorBackendStateMetaInfo{" +
+ "name='" + name + "\'" +
+ ", assignmentMode=" + assignmentMode +
+ ", partitionStateSerializer=" + partitionStateSerializer +
+ '}';
+ }
+
+ /**
+ * A consistent snapshot of a {@link RegisteredOperatorBackendStateMetaInfo}.
+ */
+ public static class Snapshot<S> {
+
+ private String name;
+ private OperatorStateHandle.Mode assignmentMode;
+ private TypeSerializer<S> partitionStateSerializer;
+ private TypeSerializerConfigSnapshot partitionStateSerializerConfigSnapshot;
+
+ /** Empty constructor used when restoring the state meta info snapshot. */
+ Snapshot() {}
+
+ private Snapshot(
+ String name,
+ OperatorStateHandle.Mode assignmentMode,
+ TypeSerializer<S> partitionStateSerializer,
+ TypeSerializerConfigSnapshot partitionStateSerializerConfigSnapshot) {
+
+ this.name = Preconditions.checkNotNull(name);
+ this.assignmentMode = Preconditions.checkNotNull(assignmentMode);
+ this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer);
+ this.partitionStateSerializerConfigSnapshot = Preconditions.checkNotNull(partitionStateSerializerConfigSnapshot);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ void setName(String name) {
+ this.name = name;
+ }
+
+ public OperatorStateHandle.Mode getAssignmentMode() {
+ return assignmentMode;
+ }
+
+ void setAssignmentMode(OperatorStateHandle.Mode assignmentMode) {
+ this.assignmentMode = assignmentMode;
+ }
+
+ public TypeSerializer<S> getPartitionStateSerializer() {
+ return partitionStateSerializer;
+ }
+
+ void setPartitionStateSerializer(TypeSerializer<S> partitionStateSerializer) {
+ this.partitionStateSerializer = partitionStateSerializer;
+ }
+
+ public TypeSerializerConfigSnapshot getPartitionStateSerializerConfigSnapshot() {
+ return partitionStateSerializerConfigSnapshot;
+ }
+
+ void setPartitionStateSerializerConfigSnapshot(TypeSerializerConfigSnapshot partitionStateSerializerConfigSnapshot) {
+ this.partitionStateSerializerConfigSnapshot = partitionStateSerializerConfigSnapshot;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ // need to check for nulls because serializer and config snapshots may be null on restore
+ return (obj instanceof Snapshot)
+ && name.equals(((Snapshot) obj).getName())
+ && assignmentMode.equals(((Snapshot) obj).getAssignmentMode())
+ && ((partitionStateSerializer == null && ((Snapshot) obj).getPartitionStateSerializer() == null)
+ || partitionStateSerializer.equals(((Snapshot) obj).getPartitionStateSerializer()))
+ && ((partitionStateSerializerConfigSnapshot == null && ((Snapshot) obj).getPartitionStateSerializerConfigSnapshot() == null)
+ || partitionStateSerializerConfigSnapshot.equals(((Snapshot) obj).getPartitionStateSerializerConfigSnapshot()));
+ }
+
+ @Override
+ public int hashCode() {
+ // need to check for nulls because serializer and config snapshots may be null on restore
+ int result = getName().hashCode();
+ result = 31 * result + getAssignmentMode().hashCode();
+ result = 31 * result + (getPartitionStateSerializer() != null ? getPartitionStateSerializer().hashCode() : 0);
+ result = 31 * result + (getPartitionStateSerializerConfigSnapshot() != null ? getPartitionStateSerializerConfigSnapshot().hashCode() : 0);
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
new file mode 100644
index 0000000..978f28d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateMigrationUtil.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+
+/**
+ * Utilities related to state migration, commonly used in the state backends.
+ */
+public class StateMigrationUtil {
+
+ /**
+ * Resolves the final compatibility result of two serializers by taking into account compound information,
+ * including the preceding serializer, the preceding serializer's configuration snapshot, and the new serializer.
+ *
+ * The final result is determined as follows:
+ * 1. If there is no configuration snapshot of the preceding serializer,
+ * assumes the new serializer to be compatible.
+ * 2. Confront the configuration snapshot with the new serializer.
+ * 3. If the result is compatible, just return that as the result.
+ * 4. If not compatible and requires migration, check if the preceding serializer is valid.
+ * If yes, use that as the convert deserializer for state migration.
+ * 5. If the preceding serializer is not valid, check if the result came with a convert deserializer.
+ * If yes, use that for state migration and simply return the result.
+ * 6. If all of above fails, state migration is required but could not be performed; throw exception.
+ *
+ * @param precedingSerializer the preceding serializer used to write the data
+ * @param dummySerializerClassTag any class tags that identifies the preceding serializer as a dummy placeholder
+ * @param precedingSerializerConfigSnapshot configuration snapshot of the preceding serializer
+ * @param newSerializer the new serializer to ensure compatibility with
+ *
+ * @param <T> Type of the data handled by the serializers
+ *
+ * @return the final resolved compatiblity result
+ */
+ public static <T> CompatibilityResult<T> resolveCompatibilityResult(
+ TypeSerializer<T> precedingSerializer,
+ Class<?> dummySerializerClassTag,
+ TypeSerializerConfigSnapshot precedingSerializerConfigSnapshot,
+ TypeSerializer<T> newSerializer) {
+
+ if (precedingSerializerConfigSnapshot != null) {
+ CompatibilityResult<T> initialResult = newSerializer.ensureCompatibility(precedingSerializerConfigSnapshot);
+
+ if (!initialResult.requiresMigration()) {
+ return initialResult;
+ } else {
+ if (precedingSerializer != null && !(precedingSerializer.getClass().equals(dummySerializerClassTag))) {
+ // if the preceding serializer exists and is not a dummy, use
+ // that for converting instead of the provided convert deserializer
+ return CompatibilityResult.requiresMigration(precedingSerializer);
+ } else if (initialResult.getConvertDeserializer() != null) {
+ return initialResult;
+ } else {
+ throw new RuntimeException(
+ "State migration required, but there is no available serializer capable of reading previous data.");
+ }
+ }
+ } else {
+ // if the configuration snapshot of the preceding serializer cannot be provided,
+ // we can only simply assume that the new serializer is compatible
+ return CompatibilityResult.compatible();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
index 8b58891..2800899 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceSerializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.migration.MigrationNamespaceSerializerProxy;
import java.io.IOException;
@@ -89,4 +90,11 @@ public final class VoidNamespaceSerializer extends TypeSerializerSingleton<VoidN
public boolean canEqual(Object obj) {
return obj instanceof VoidNamespaceSerializer;
}
+
+ @Override
+ protected boolean isCompatibleSerializationFormatIdentifier(String identifier) {
+ // we might be replacing a migration namespace serializer, in which case we just assume compatibility
+ return super.isCompatibleSerializationFormatIdentifier(identifier)
+ || identifier.equals(MigrationNamespaceSerializerProxy.class.getCanonicalName());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
index d63b6d3..7b61da1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTable.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
@@ -196,7 +196,7 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
* @param keyContext the key context.
* @param metaInfo the meta information, including the type serializer for state copy-on-write.
*/
- CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+ CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
this(keyContext, metaInfo, 1024);
}
@@ -209,7 +209,7 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
* @throws IllegalArgumentException when the capacity is less than zero.
*/
@SuppressWarnings("unchecked")
- private CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo, int capacity) {
+ private CopyOnWriteStateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo, int capacity) {
super(keyContext, metaInfo);
// initialized tables to EMPTY_TABLE.
@@ -532,12 +532,12 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
}
@Override
- public RegisteredBackendStateMetaInfo<N, S> getMetaInfo() {
+ public RegisteredKeyedBackendStateMetaInfo<N, S> getMetaInfo() {
return metaInfo;
}
@Override
- public void setMetaInfo(RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+ public void setMetaInfo(RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
this.metaInfo = metaInfo;
}
@@ -1063,4 +1063,4 @@ public class CopyOnWriteStateTable<K, N, S> extends StateTable<K, N, S> implemen
throw new UnsupportedOperationException("Read-only iterator");
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index aecc72e..866ed28 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -52,7 +52,7 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -132,8 +132,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
TypeSerializer<N> namespaceSerializer,
TypeSerializer<V> valueSerializer) {
- final RegisteredBackendStateMetaInfo<N, V> newMetaInfo =
- new RegisteredBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer, valueSerializer);
+ final RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo =
+ new RegisteredKeyedBackendStateMetaInfo<>(stateType, stateName, namespaceSerializer, valueSerializer);
@SuppressWarnings("unchecked")
StateTable<K, N, V> stateTable = (StateTable<K, N, V>) stateTables.get(stateName);
@@ -142,12 +142,27 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
stateTable = newStateTable(newMetaInfo);
stateTables.put(stateName, stateTable);
} else {
- if (!newMetaInfo.canRestoreFrom(stateTable.getMetaInfo())) {
- throw new RuntimeException("Trying to access state using incompatible meta info, was " +
- stateTable.getMetaInfo() + " trying access with " + newMetaInfo);
+ // TODO with eager registration in place, these checks should be moved to restorePartitionedState()
+
+ Preconditions.checkState(
+ stateName.equals(stateTable.getMetaInfo().getName()),
+ "Incompatible state names. " +
+ "Was [" + stateTable.getMetaInfo().getName() + "], " +
+ "registered with [" + newMetaInfo.getName() + "].");
+
+ if (!newMetaInfo.getStateType().equals(StateDescriptor.Type.UNKNOWN)
+ && !stateTable.getMetaInfo().getStateType().equals(StateDescriptor.Type.UNKNOWN)) {
+
+ Preconditions.checkState(
+ newMetaInfo.getStateType().equals(stateTable.getMetaInfo().getStateType()),
+ "Incompatible state types. " +
+ "Was [" + stateTable.getMetaInfo().getStateType() + "], " +
+ "registered with [" + newMetaInfo.getStateType() + "].");
}
+
stateTable.setMetaInfo(newMetaInfo);
}
+
return stateTable;
}
@@ -240,21 +255,14 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
"Too many KV-States: " + stateTables.size() +
". Currently at most " + Short.MAX_VALUE + " states are supported");
- List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoProxyList = new ArrayList<>(stateTables.size());
+ List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots = new ArrayList<>(stateTables.size());
final Map<String, Integer> kVStateToId = new HashMap<>(stateTables.size());
final Map<StateTable<K, ?, ?>, StateTableSnapshot> cowStateStableSnapshots = new HashedMap(stateTables.size());
for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
- RegisteredBackendStateMetaInfo<?, ?> metaInfo = kvState.getValue().getMetaInfo();
- KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy = new KeyedBackendSerializationProxy.StateMetaInfo(
- metaInfo.getStateType(),
- metaInfo.getName(),
- metaInfo.getNamespaceSerializer(),
- metaInfo.getStateSerializer());
-
- metaInfoProxyList.add(metaInfoProxy);
+ metaInfoSnapshots.add(kvState.getValue().getMetaInfo().snapshot());
kVStateToId.put(kvState.getKey(), kVStateToId.size());
StateTable<K, ?, ?> stateTable = kvState.getValue();
if (null != stateTable) {
@@ -263,7 +271,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
final KeyedBackendSerializationProxy serializationProxy =
- new KeyedBackendSerializationProxy(keySerializer, metaInfoProxyList);
+ new KeyedBackendSerializationProxy(keySerializer, metaInfoSnapshots);
//--------------------------------------------------- this becomes the end of sync part
@@ -376,23 +384,29 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
serializationProxy.read(inView);
- List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> metaInfoList =
- serializationProxy.getNamedStateSerializationProxies();
+ List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
+ serializationProxy.getStateMetaInfoSnapshots();
- for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoSerializationProxy : metaInfoList) {
+ for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) {
- StateTable<K, ?, ?> stateTable = stateTables.get(metaInfoSerializationProxy.getStateName());
+ StateTable<K, ?, ?> stateTable = stateTables.get(restoredMetaInfo.getName());
//important: only create a new table we did not already create it previously
if (null == stateTable) {
- RegisteredBackendStateMetaInfo<?, ?> registeredBackendStateMetaInfo =
- new RegisteredBackendStateMetaInfo<>(metaInfoSerializationProxy);
+ RegisteredKeyedBackendStateMetaInfo<?, ?> registeredKeyedBackendStateMetaInfo =
+ new RegisteredKeyedBackendStateMetaInfo<>(
+ restoredMetaInfo.getStateType(),
+ restoredMetaInfo.getName(),
+ restoredMetaInfo.getNamespaceSerializer(),
+ restoredMetaInfo.getStateSerializer());
- stateTable = newStateTable(registeredBackendStateMetaInfo);
- stateTables.put(metaInfoSerializationProxy.getStateName(), stateTable);
- kvStatesById.put(numRegisteredKvStates, metaInfoSerializationProxy.getStateName());
+ stateTable = newStateTable(registeredKeyedBackendStateMetaInfo);
+ stateTables.put(restoredMetaInfo.getName(), stateTable);
+ kvStatesById.put(numRegisteredKvStates, restoredMetaInfo.getName());
++numRegisteredKvStates;
+ } else {
+ // TODO with eager state registration in place, check here for serializer migration strategies
}
}
@@ -410,7 +424,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
"Unexpected key-group in restore.");
- for (int i = 0; i < metaInfoList.size(); i++) {
+ for (int i = 0; i < restoredMetaInfos.size(); i++) {
int kvStateId = inView.readShort();
StateTable<K, ?, ?> stateTable = stateTables.get(kvStatesById.get(kvStateId));
@@ -509,7 +523,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return sum;
}
- public <N, V> StateTable<K, N, V> newStateTable(RegisteredBackendStateMetaInfo<N, V> newMetaInfo) {
+ public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyedBackendStateMetaInfo<N, V> newMetaInfo) {
return asynchronousSnapshots ?
new CopyOnWriteStateTable<>(this, newMetaInfo) :
new NestedMapsStateTable<>(this, newMetaInfo);
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
index 22f344d..75c31db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.util.Preconditions;
@@ -63,7 +63,7 @@ public class NestedMapsStateTable<K, N, S> extends StateTable<K, N, S> {
* @param keyContext the key context.
* @param metaInfo the meta information for this state table.
*/
- public NestedMapsStateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+ public NestedMapsStateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
super(keyContext, metaInfo);
this.keyGroupOffset = keyContext.getKeyGroupRange().getStartKeyGroup();
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index 62fc869..c1cdcc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.util.Preconditions;
@@ -42,14 +42,14 @@ public abstract class StateTable<K, N, S> {
/**
* Combined meta information such as name and serializers for this state
*/
- protected RegisteredBackendStateMetaInfo<N, S> metaInfo;
+ protected RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo;
/**
*
* @param keyContext the key context provides the key scope for all put/get/delete operations.
* @param metaInfo the meta information, including the type serializer for state copy-on-write.
*/
- public StateTable(InternalKeyContext<K> keyContext, RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+ public StateTable(InternalKeyContext<K> keyContext, RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
this.keyContext = Preconditions.checkNotNull(keyContext);
this.metaInfo = Preconditions.checkNotNull(metaInfo);
}
@@ -168,11 +168,11 @@ public abstract class StateTable<K, N, S> {
return metaInfo.getNamespaceSerializer();
}
- public RegisteredBackendStateMetaInfo<N, S> getMetaInfo() {
+ public RegisteredKeyedBackendStateMetaInfo<N, S> getMetaInfo() {
return metaInfo;
}
- public void setMetaInfo(RegisteredBackendStateMetaInfo<N, S> metaInfo) {
+ public void setMetaInfo(RegisteredKeyedBackendStateMetaInfo<N, S> metaInfo) {
this.metaInfo = metaInfo;
}
@@ -186,4 +186,4 @@ public abstract class StateTable<K, N, S> {
@VisibleForTesting
public abstract int sizeOfNamespace(Object namespace);
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
index 53ec349..d7bc94e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
@@ -47,7 +47,8 @@ class StateTableByKeyGroupReaders {
case 1:
return new StateTableByKeyGroupReaderV1<>(table);
case 2:
- return new StateTableByKeyGroupReaderV2<>(table);
+ case 3:
+ return new StateTableByKeyGroupReaderV2V3<>(table);
default:
throw new IllegalArgumentException("Unknown version: " + version);
}
@@ -110,10 +111,10 @@ class StateTableByKeyGroupReaders {
}
}
- private static final class StateTableByKeyGroupReaderV2<K, N, S>
+ private static final class StateTableByKeyGroupReaderV2V3<K, N, S>
extends AbstractStateTableByKeyGroupReader<K, N, S> {
- StateTableByKeyGroupReaderV2(StateTable<K, N, S> stateTable) {
+ StateTableByKeyGroupReaderV2V3(StateTable<K, N, S> stateTable) {
super(stateTable);
}
@@ -133,4 +134,4 @@ class StateTableByKeyGroupReaders {
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
index e872526..4bdc5e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.operators.testutils.types;
import java.io.IOException;
import java.util.Arrays;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -127,4 +129,14 @@ public class IntListSerializer extends TypeSerializer<IntList> {
public int hashCode() {
return IntListSerializer.class.hashCode();
}
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompatibilityResult<IntList> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
index e4a9264..0ae5e71 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java
@@ -21,7 +21,9 @@ package org.apache.flink.runtime.operators.testutils.types;
import java.io.IOException;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
@@ -136,4 +138,14 @@ public class IntPairSerializer extends TypeSerializer<IntPair> {
return obj.getClass() == IntPairSerializerFactory.class;
};
}
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompatibilityResult<IntPair> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
index b62b097..17ee5f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.operators.testutils.types;
import java.io.IOException;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.StringValue;
@@ -104,4 +106,14 @@ public class StringPairSerializer extends TypeSerializer<StringPair> {
public int hashCode() {
return StringPairSerializer.class.hashCode();
}
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompatibilityResult<StringPair> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
index b4d6eb7..8c4e049 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.query.netty.UnknownKvStateID;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
+import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapValueState;
@@ -270,7 +270,7 @@ public class QueryableStateClientTest {
ValueStateDescriptor<Integer> descriptor =
new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
- RegisteredBackendStateMetaInfo<VoidNamespace, Integer> registeredBackendStateMetaInfo = new RegisteredBackendStateMetaInfo<>(
+ RegisteredKeyedBackendStateMetaInfo<VoidNamespace, Integer> registeredKeyedBackendStateMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
descriptor.getType(),
descriptor.getName(),
VoidNamespaceSerializer.INSTANCE,
@@ -279,7 +279,7 @@ public class QueryableStateClientTest {
// Register state
HeapValueState<Integer, VoidNamespace, Integer> kvState = new HeapValueState<>(
descriptor,
- new NestedMapsStateTable<Integer, VoidNamespace, Integer>(keyedStateBackend, registeredBackendStateMetaInfo),
+ new NestedMapsStateTable<Integer, VoidNamespace, Integer>(keyedStateBackend, registeredKeyedBackendStateMetaInfo),
IntSerializer.INSTANCE,
VoidNamespaceSerializer.INSTANCE);
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index c04ed8c..50ca159 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -97,7 +97,7 @@ public class OperatorStateBackendTest {
assertEquals(2, operatorStateBackend.getRegisteredStateNames().size());
// make sure that type registrations are forwarded
- TypeSerializer<?> serializer = ((PartitionableListState<?>) listState).getPartitionStateSerializer();
+ TypeSerializer<?> serializer = ((PartitionableListState<?>) listState).getStateMetaInfo().getPartitionStateSerializer();
assertTrue(serializer instanceof KryoSerializer);
assertTrue(((KryoSerializer<?>) serializer).getKryo().getSerializer(registeredType)
instanceof com.esotericsoftware.kryo.serializers.JavaSerializer);
http://git-wip-us.apache.org/repos/asf/flink/blob/8aa5e057/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 0dbe2eb..02b4d62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
@@ -29,10 +30,21 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({KeyedBackendStateMetaInfoSnapshotReaderWriters.class, OperatorBackendStateMetaInfoSnapshotReaderWriters.class})
public class SerializationProxiesTest {
@Test
@@ -42,14 +54,14 @@ public class SerializationProxiesTest {
TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
- List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoList = new ArrayList<>();
+ List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoList = new ArrayList<>();
- stateMetaInfoList.add(
- new KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, "a", namespaceSerializer, stateSerializer));
- stateMetaInfoList.add(
- new KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, "b", namespaceSerializer, stateSerializer));
- stateMetaInfoList.add(
- new KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer));
+ stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+ StateDescriptor.Type.VALUE, "a", namespaceSerializer, stateSerializer).snapshot());
+ stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+ StateDescriptor.Type.VALUE, "b", namespaceSerializer, stateSerializer).snapshot());
+ stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
+ StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());
KeyedBackendSerializationProxy serializationProxy =
new KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList);
@@ -67,8 +79,8 @@ public class SerializationProxiesTest {
serializationProxy.read(new DataInputViewStreamWrapper(in));
}
- Assert.assertEquals(keySerializer, serializationProxy.getKeySerializerProxy().getTypeSerializer());
- Assert.assertEquals(stateMetaInfoList, serializationProxy.getNamedStateSerializationProxies());
+ Assert.assertEquals(keySerializer, serializationProxy.getKeySerializer());
+ Assert.assertEquals(stateMetaInfoList, serializationProxy.getStateMetaInfoSnapshots());
}
@Test
@@ -78,41 +90,79 @@ public class SerializationProxiesTest {
TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
- KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfo =
- new KeyedBackendSerializationProxy.StateMetaInfo<>(StateDescriptor.Type.VALUE, name, namespaceSerializer, stateSerializer);
+ RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> metaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+ StateDescriptor.Type.VALUE, name, namespaceSerializer, stateSerializer).snapshot();
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
- metaInfo.write(new DataOutputViewStreamWrapper(out));
+ KeyedBackendStateMetaInfoSnapshotReaderWriters
+ .getWriterForVersion(KeyedBackendSerializationProxy.VERSION, metaInfo)
+ .writeStateMetaInfo(new DataOutputViewStreamWrapper(out));
+
serialized = out.toByteArray();
}
- metaInfo = new KeyedBackendSerializationProxy.StateMetaInfo<>(Thread.currentThread().getContextClassLoader());
-
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
- metaInfo.read(new DataInputViewStreamWrapper(in));
+ metaInfo = KeyedBackendStateMetaInfoSnapshotReaderWriters
+ .getReaderForVersion(KeyedBackendSerializationProxy.VERSION, Thread.currentThread().getContextClassLoader())
+ .readStateMetaInfo(new DataInputViewStreamWrapper(in));
}
- Assert.assertEquals(name, metaInfo.getStateName());
+ Assert.assertEquals(name, metaInfo.getName());
}
+ @Test
+ public void testKeyedStateMetaInfoReadSerializerFailureResilience() throws Exception {
+ String name = "test";
+ TypeSerializer<?> namespaceSerializer = LongSerializer.INSTANCE;
+ TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
+
+ RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> metaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
+ StateDescriptor.Type.VALUE, name, namespaceSerializer, stateSerializer).snapshot();
+
+ byte[] serialized;
+ try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+ KeyedBackendStateMetaInfoSnapshotReaderWriters
+ .getWriterForVersion(KeyedBackendSerializationProxy.VERSION, metaInfo)
+ .writeStateMetaInfo(new DataOutputViewStreamWrapper(out));
+
+ serialized = out.toByteArray();
+ }
+
+ // mock failure when deserializing serializer
+ TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class);
+ doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+ PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
+
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ metaInfo = KeyedBackendStateMetaInfoSnapshotReaderWriters
+ .getReaderForVersion(KeyedBackendSerializationProxy.VERSION, Thread.currentThread().getContextClassLoader())
+ .readStateMetaInfo(new DataInputViewStreamWrapper(in));
+ }
+
+ Assert.assertEquals(name, metaInfo.getName());
+ Assert.assertEquals(null, metaInfo.getNamespaceSerializer());
+ Assert.assertEquals(null, metaInfo.getStateSerializer());
+ Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), metaInfo.getNamespaceSerializerConfigSnapshot());
+ Assert.assertEquals(stateSerializer.snapshotConfiguration(), metaInfo.getStateSerializerConfigSnapshot());
+ }
@Test
public void testOperatorBackendSerializationProxyRoundtrip() throws Exception {
TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
- List<OperatorBackendSerializationProxy.StateMetaInfo<?>> stateMetaInfoList = new ArrayList<>();
+ List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> stateMetaInfoSnapshots = new ArrayList<>();
- stateMetaInfoList.add(
- new OperatorBackendSerializationProxy.StateMetaInfo<>("a", stateSerializer, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
- stateMetaInfoList.add(
- new OperatorBackendSerializationProxy.StateMetaInfo<>("b", stateSerializer, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
- stateMetaInfoList.add(
- new OperatorBackendSerializationProxy.StateMetaInfo<>("c", stateSerializer, OperatorStateHandle.Mode.BROADCAST));
+ stateMetaInfoSnapshots.add(new RegisteredOperatorBackendStateMetaInfo<>(
+ "a", stateSerializer, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE).snapshot());
+ stateMetaInfoSnapshots.add(new RegisteredOperatorBackendStateMetaInfo<>(
+ "b", stateSerializer, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE).snapshot());
+ stateMetaInfoSnapshots.add(new RegisteredOperatorBackendStateMetaInfo<>(
+ "c", stateSerializer, OperatorStateHandle.Mode.BROADCAST).snapshot());
OperatorBackendSerializationProxy serializationProxy =
- new OperatorBackendSerializationProxy(stateMetaInfoList);
+ new OperatorBackendSerializationProxy(stateMetaInfoSnapshots);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
@@ -127,7 +177,7 @@ public class SerializationProxiesTest {
serializationProxy.read(new DataInputViewStreamWrapper(in));
}
- Assert.assertEquals(stateMetaInfoList, serializationProxy.getNamedStateSerializationProxies());
+ Assert.assertEquals(stateMetaInfoSnapshots, serializationProxy.getStateMetaInfoSnapshots());
}
@Test
@@ -136,22 +186,60 @@ public class SerializationProxiesTest {
String name = "test";
TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
- OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo =
- new OperatorBackendSerializationProxy.StateMetaInfo<>(name, stateSerializer, OperatorStateHandle.Mode.BROADCAST);
+ RegisteredOperatorBackendStateMetaInfo.Snapshot<?> metaInfo =
+ new RegisteredOperatorBackendStateMetaInfo<>(
+ name, stateSerializer, OperatorStateHandle.Mode.BROADCAST).snapshot();
+
+ byte[] serialized;
+ try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
+ OperatorBackendStateMetaInfoSnapshotReaderWriters
+ .getWriterForVersion(OperatorBackendSerializationProxy.VERSION, metaInfo)
+ .writeStateMetaInfo(new DataOutputViewStreamWrapper(out));
+
+ serialized = out.toByteArray();
+ }
+
+ try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
+ metaInfo = OperatorBackendStateMetaInfoSnapshotReaderWriters
+ .getReaderForVersion(OperatorBackendSerializationProxy.VERSION, Thread.currentThread().getContextClassLoader())
+ .readStateMetaInfo(new DataInputViewStreamWrapper(in));
+ }
+
+ Assert.assertEquals(name, metaInfo.getName());
+ }
+
+ @Test
+ public void testOperatorStateMetaInfoReadSerializerFailureResilience() throws Exception {
+ String name = "test";
+ TypeSerializer<?> stateSerializer = DoubleSerializer.INSTANCE;
+
+ RegisteredOperatorBackendStateMetaInfo.Snapshot<?> metaInfo =
+ new RegisteredOperatorBackendStateMetaInfo<>(
+ name, stateSerializer, OperatorStateHandle.Mode.BROADCAST).snapshot();
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
- metaInfo.write(new DataOutputViewStreamWrapper(out));
+ OperatorBackendStateMetaInfoSnapshotReaderWriters
+ .getWriterForVersion(OperatorBackendSerializationProxy.VERSION, metaInfo)
+ .writeStateMetaInfo(new DataOutputViewStreamWrapper(out));
+
serialized = out.toByteArray();
}
- metaInfo = new OperatorBackendSerializationProxy.StateMetaInfo<>(Thread.currentThread().getContextClassLoader());
+ // mock failure when deserializing serializer
+ TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class);
+ doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
+ PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
- metaInfo.read(new DataInputViewStreamWrapper(in));
+ metaInfo = OperatorBackendStateMetaInfoSnapshotReaderWriters
+ .getReaderForVersion(OperatorBackendSerializationProxy.VERSION, Thread.currentThread().getContextClassLoader())
+ .readStateMetaInfo(new DataInputViewStreamWrapper(in));
}
Assert.assertEquals(name, metaInfo.getName());
+ Assert.assertEquals(null, metaInfo.getPartitionStateSerializer());
+ Assert.assertEquals(stateSerializer.snapshotConfiguration(), metaInfo.getPartitionStateSerializerConfigSnapshot());
}
/**
@@ -171,4 +259,4 @@ public class SerializationProxiesTest {
Assert.assertEquals(5, StateDescriptor.Type.AGGREGATING.ordinal());
Assert.assertEquals(6, StateDescriptor.Type.MAP.ordinal());
}
-}
\ No newline at end of file
+}