You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2018/07/11 15:14:16 UTC
[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...
GitHub user StefanRRichter opened a pull request:
https://github.com/apache/flink/pull/6308
[FLINK-9799] Generalize and unify state meta info snapshot
## What is the purpose of the change
This PR generalizes and unifies the de/serialization of state meta information in backends. We replace the snapshots and reader/writers of the individual state types with a general `StateMetaInfoSnapshot` and the corresponding `StateMetaInfoSnapshotReadersWriters`. Backwards compatibility is maintained.
## Verifying this change
This change is already covered by existing tests.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/StefanRRichter/flink FLINK-9799-generalize-state-meta-pr
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6308.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6308
----
commit 5e44f759342793f4532e99f8df589c2416402176
Author: Stefan Richter <s....@...>
Date: 2018-07-11T09:11:11Z
[FLINK-9799][state] Generalize and unify state meta infos
----
---
[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6308#discussion_r202047403
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java ---
@@ -126,11 +131,11 @@ public void testKeyedBackendSerializationProxyRoundtripWithSerializerSerializati
Assert.assertTrue(serializationProxy.getKeySerializer() instanceof UnloadableDummyTypeSerializer);
Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot());
- for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> meta : serializationProxy.getStateMetaInfoSnapshots()) {
- Assert.assertTrue(meta.getNamespaceSerializer() instanceof UnloadableDummyTypeSerializer);
- Assert.assertTrue(meta.getStateSerializer() instanceof UnloadableDummyTypeSerializer);
- Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), meta.getNamespaceSerializerConfigSnapshot());
- Assert.assertEquals(stateSerializer.snapshotConfiguration(), meta.getStateSerializerConfigSnapshot());
+ for (StateMetaInfoSnapshot meta : serializationProxy.getStateMetaInfoSnapshots()) {
--- End diff --
👍 Good point
---
[GitHub] flink issue #6308: [FLINK-9799] Generalize and unify state meta info snapsho...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/6308
CC @azagrebin
---
[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...
Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:
https://github.com/apache/flink/pull/6308#discussion_r201778299
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java ---
@@ -98,8 +103,7 @@ public int getVersion() {
@Override
public int[] getCompatibleVersions() {
- // we are compatible with version 3 (Flink 1.3.x) and version 1 & 2 (Flink 1.2.x)
- return new int[] {VERSION, 3, 2, 1};
+ return new int[]{VERSION, 4, 3, 2, 1};
--- End diff --
nit: miss a ' ' between '[]' and '{'.
---
[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/6308
---
[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6308#discussion_r202035286
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java ---
@@ -126,11 +131,11 @@ public void testKeyedBackendSerializationProxyRoundtripWithSerializerSerializati
Assert.assertTrue(serializationProxy.getKeySerializer() instanceof UnloadableDummyTypeSerializer);
Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot());
- for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> meta : serializationProxy.getStateMetaInfoSnapshots()) {
- Assert.assertTrue(meta.getNamespaceSerializer() instanceof UnloadableDummyTypeSerializer);
- Assert.assertTrue(meta.getStateSerializer() instanceof UnloadableDummyTypeSerializer);
- Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), meta.getNamespaceSerializerConfigSnapshot());
- Assert.assertEquals(stateSerializer.snapshotConfiguration(), meta.getStateSerializerConfigSnapshot());
+ for (StateMetaInfoSnapshot meta : serializationProxy.getStateMetaInfoSnapshots()) {
--- End diff --
Change like
`RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> meta -> RegisteredKeyedBackendStateMetaInfo(StateMetaInfoSnapshot)`
and then using original assertions might be more complete version of test.
Also for other meta wrappers in further tests
---
[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6308#discussion_r202047581
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java ---
@@ -98,8 +103,7 @@ public int getVersion() {
@Override
public int[] getCompatibleVersions() {
- // we are compatible with version 3 (Flink 1.3.x) and version 1 & 2 (Flink 1.2.x)
- return new int[] {VERSION, 3, 2, 1};
+ return new int[]{VERSION, 4, 3, 2, 1};
--- End diff --
Both styles are ok and used in Flink, so I will stick to this.
---
[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6308#discussion_r202033136
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java ---
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.metainfo;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Static factory that gives out the write and readers for different versions of {@link StateMetaInfoSnapshot}.
+ */
+public class StateMetaInfoSnapshotReadersWriters {
+
+ /**
+ * Current version for the serialization format of {@link StateMetaInfoSnapshotReadersWriters}.
+ * - v5: Flink 1.6.x
+ */
+ public static final int CURRENT_STATE_META_INFO_SNAPSHOT_VERSION = 5;
+
+ /**
+ * Enum for backeards compatibility. This gives a hint about the expected state type for which a
+ * {@link StateMetaInfoSnapshot} should be deserialized.
+ *
+ * TODO this can go away after we eventually drop backwards compatibility with all versions < 5.
+ */
+ public enum StateTypeHint {
+ KEYED_STATE,
+ OPERATOR_STATE
+ }
+
+ /**
+ * Returns the writer for {@link StateMetaInfoSnapshot}.
+ */
+ @Nonnull
+ public static StateMetaInfoWriter getWriter() {
+ return CurrentWriterImpl.INSTANCE;
+ }
+
+ /**
+ * Returns a reader for {@link StateMetaInfoSnapshot} with the requested state type and version number.
+ *
+ * @param readVersion the format version to read.
+ * @param stateTypeHint a hint about the expected type to read.
+ * @return the requested reader.
+ */
+ @Nonnull
+ public static StateMetaInfoReader getReader(int readVersion, @Nonnull StateTypeHint stateTypeHint) {
+
+ if (readVersion == CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
+ // latest version shortcut
+ return CurrentReaderImpl.INSTANCE;
+ }
+
+ if (readVersion > CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
+ throw new IllegalArgumentException("Unsupported read version for state meta info: " + readVersion);
+ }
+
+ switch (stateTypeHint) {
--- End diff --
One small suggestion here is to move all the legacy stuff into separate package/class and leave here only call to legacy reader factory. That would unload this class a bit and simplify cleaning of legacy stuff.
---
[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6308#discussion_r202041046
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java ---
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.metainfo;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Static factory that gives out the write and readers for different versions of {@link StateMetaInfoSnapshot}.
+ */
+public class StateMetaInfoSnapshotReadersWriters {
+
+ /**
+ * Current version for the serialization format of {@link StateMetaInfoSnapshotReadersWriters}.
+ * - v5: Flink 1.6.x
+ */
+ public static final int CURRENT_STATE_META_INFO_SNAPSHOT_VERSION = 5;
+
+ /**
+ * Enum for backeards compatibility. This gives a hint about the expected state type for which a
+ * {@link StateMetaInfoSnapshot} should be deserialized.
+ *
+ * TODO this can go away after we eventually drop backwards compatibility with all versions < 5.
+ */
+ public enum StateTypeHint {
+ KEYED_STATE,
+ OPERATOR_STATE
+ }
+
+ /**
+ * Returns the writer for {@link StateMetaInfoSnapshot}.
+ */
+ @Nonnull
+ public static StateMetaInfoWriter getWriter() {
+ return CurrentWriterImpl.INSTANCE;
+ }
+
+ /**
+ * Returns a reader for {@link StateMetaInfoSnapshot} with the requested state type and version number.
+ *
+ * @param readVersion the format version to read.
+ * @param stateTypeHint a hint about the expected type to read.
+ * @return the requested reader.
+ */
+ @Nonnull
+ public static StateMetaInfoReader getReader(int readVersion, @Nonnull StateTypeHint stateTypeHint) {
+
+ if (readVersion == CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
+ // latest version shortcut
+ return CurrentReaderImpl.INSTANCE;
+ }
+
+ if (readVersion > CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
+ throw new IllegalArgumentException("Unsupported read version for state meta info: " + readVersion);
+ }
+
+ switch (stateTypeHint) {
--- End diff --
👍
---
[GitHub] flink issue #6308: [FLINK-9799] Generalize and unify state meta info snapsho...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/6308
@sihuazhou @azagrebin thanks guys for the fast reviews! Will address the comments and merge.
---
[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...
Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:
https://github.com/apache/flink/pull/6308#discussion_r201798531
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java ---
@@ -62,62 +67,62 @@ public int getVersion() {
@Override
public int[] getCompatibleVersions() {
- // we are compatible with version 3 (Flink 1.5.x), 2 (Flink 1.4.x, Flink 1.3.x) and version 1 (Flink 1.2.x)
--- End diff --
was this comment redundant?
---