You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2018/07/11 15:14:16 UTC

[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/6308

    [FLINK-9799] Generalize and unify state meta info snapshot

    ## What is the purpose of the change
    
    This PR generalizes and unifies the de/serialization of state meta information in backends. We replace the snapshots and reader/writers of the individual state types with a general `StateMetaInfoSnapshot` and the corresponding `StateMetaInfoSnapshotReadersWriters`. Backwards compatibility is maintained.
    
    ## Verifying this change
    
    This change is already covered by existing tests.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink FLINK-9799-generalize-state-meta-pr

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6308.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6308
    
----
commit 5e44f759342793f4532e99f8df589c2416402176
Author: Stefan Richter <s....@...>
Date:   2018-07-11T09:11:11Z

    [FLINK-9799][state] Generalize and unify state meta infos

----


---

[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6308#discussion_r202047403
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java ---
    @@ -126,11 +131,11 @@ public void testKeyedBackendSerializationProxyRoundtripWithSerializerSerializati
     		Assert.assertTrue(serializationProxy.getKeySerializer() instanceof UnloadableDummyTypeSerializer);
     		Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot());
     
    -		for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> meta : serializationProxy.getStateMetaInfoSnapshots()) {
    -			Assert.assertTrue(meta.getNamespaceSerializer() instanceof UnloadableDummyTypeSerializer);
    -			Assert.assertTrue(meta.getStateSerializer() instanceof UnloadableDummyTypeSerializer);
    -			Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), meta.getNamespaceSerializerConfigSnapshot());
    -			Assert.assertEquals(stateSerializer.snapshotConfiguration(), meta.getStateSerializerConfigSnapshot());
    +		for (StateMetaInfoSnapshot meta : serializationProxy.getStateMetaInfoSnapshots()) {
    --- End diff --
    
    👍 Good point


---

[GitHub] flink issue #6308: [FLINK-9799] Generalize and unify state meta info snapsho...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6308
  
    CC @azagrebin 


---

[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...

Posted by sihuazhou <gi...@git.apache.org>.
Github user sihuazhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6308#discussion_r201778299
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java ---
    @@ -98,8 +103,7 @@ public int getVersion() {
     
     	@Override
     	public int[] getCompatibleVersions() {
    -		// we are compatible with version 3 (Flink 1.3.x) and version 1 & 2 (Flink 1.2.x)
    -		return new int[] {VERSION, 3, 2, 1};
    +		return new int[]{VERSION, 4, 3, 2, 1};
    --- End diff --
    
    nit: miss a ' ' between '[]' and '{'.


---

[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/6308


---

[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6308#discussion_r202035286
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java ---
    @@ -126,11 +131,11 @@ public void testKeyedBackendSerializationProxyRoundtripWithSerializerSerializati
     		Assert.assertTrue(serializationProxy.getKeySerializer() instanceof UnloadableDummyTypeSerializer);
     		Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot());
     
    -		for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> meta : serializationProxy.getStateMetaInfoSnapshots()) {
    -			Assert.assertTrue(meta.getNamespaceSerializer() instanceof UnloadableDummyTypeSerializer);
    -			Assert.assertTrue(meta.getStateSerializer() instanceof UnloadableDummyTypeSerializer);
    -			Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), meta.getNamespaceSerializerConfigSnapshot());
    -			Assert.assertEquals(stateSerializer.snapshotConfiguration(), meta.getStateSerializerConfigSnapshot());
    +		for (StateMetaInfoSnapshot meta : serializationProxy.getStateMetaInfoSnapshots()) {
    --- End diff --
    
    Change like
    `RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> meta -> RegisteredKeyedBackendStateMetaInfo(StateMetaInfoSnapshot)`
    and then using original assertions might be more complete version of test.
    Also for other meta wrappers in further tests


---

[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6308#discussion_r202047581
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java ---
    @@ -98,8 +103,7 @@ public int getVersion() {
     
     	@Override
     	public int[] getCompatibleVersions() {
    -		// we are compatible with version 3 (Flink 1.3.x) and version 1 & 2 (Flink 1.2.x)
    -		return new int[] {VERSION, 3, 2, 1};
    +		return new int[]{VERSION, 4, 3, 2, 1};
    --- End diff --
    
    Both styles are ok and used in Flink, so I will stick to this.


---

[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6308#discussion_r202033136
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java ---
    @@ -0,0 +1,412 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.metainfo;
    +
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.runtime.state.OperatorStateHandle;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * Static factory that gives out the write and readers for different versions of {@link StateMetaInfoSnapshot}.
    + */
    +public class StateMetaInfoSnapshotReadersWriters {
    +
    +	/**
    +	 * Current version for the serialization format of {@link StateMetaInfoSnapshotReadersWriters}.
    +	 * - v5: Flink 1.6.x
    +	 */
    +	public static final int CURRENT_STATE_META_INFO_SNAPSHOT_VERSION = 5;
    +
    +	/**
    +	 * Enum for backeards compatibility. This gives a hint about the expected state type for which a
    +	 * {@link StateMetaInfoSnapshot} should be deserialized.
    +	 *
    +	 * TODO this can go away after we eventually drop backwards compatibility with all versions < 5.
    +	 */
    +	public enum StateTypeHint {
    +		KEYED_STATE,
    +		OPERATOR_STATE
    +	}
    +
    +	/**
    +	 * Returns the writer for {@link StateMetaInfoSnapshot}.
    +	 */
    +	@Nonnull
    +	public static StateMetaInfoWriter getWriter() {
    +		return CurrentWriterImpl.INSTANCE;
    +	}
    +
    +	/**
    +	 * Returns a reader for {@link StateMetaInfoSnapshot} with the requested state type and version number.
    +	 *
    +	 * @param readVersion the format version to read.
    +	 * @param stateTypeHint a hint about the expected type to read.
    +	 * @return the requested reader.
    +	 */
    +	@Nonnull
    +	public static StateMetaInfoReader getReader(int readVersion, @Nonnull StateTypeHint stateTypeHint) {
    +
    +		if (readVersion == CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
    +			// latest version shortcut
    +			return CurrentReaderImpl.INSTANCE;
    +		}
    +
    +		if (readVersion > CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
    +			throw new IllegalArgumentException("Unsupported read version for state meta info: " + readVersion);
    +		}
    +
    +		switch (stateTypeHint) {
    --- End diff --
    
    One small suggestion here is to move all the legacy stuff into separate package/class and leave here only call to legacy reader factory. That would unload this class a bit and simplify cleaning of legacy stuff.


---

[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6308#discussion_r202041046
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java ---
    @@ -0,0 +1,412 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.metainfo;
    +
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.runtime.state.OperatorStateHandle;
    +import org.apache.flink.util.InstantiationUtil;
    +import org.apache.flink.util.Preconditions;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * Static factory that gives out the write and readers for different versions of {@link StateMetaInfoSnapshot}.
    + */
    +public class StateMetaInfoSnapshotReadersWriters {
    +
    +	/**
    +	 * Current version for the serialization format of {@link StateMetaInfoSnapshotReadersWriters}.
    +	 * - v5: Flink 1.6.x
    +	 */
    +	public static final int CURRENT_STATE_META_INFO_SNAPSHOT_VERSION = 5;
    +
    +	/**
    +	 * Enum for backeards compatibility. This gives a hint about the expected state type for which a
    +	 * {@link StateMetaInfoSnapshot} should be deserialized.
    +	 *
    +	 * TODO this can go away after we eventually drop backwards compatibility with all versions < 5.
    +	 */
    +	public enum StateTypeHint {
    +		KEYED_STATE,
    +		OPERATOR_STATE
    +	}
    +
    +	/**
    +	 * Returns the writer for {@link StateMetaInfoSnapshot}.
    +	 */
    +	@Nonnull
    +	public static StateMetaInfoWriter getWriter() {
    +		return CurrentWriterImpl.INSTANCE;
    +	}
    +
    +	/**
    +	 * Returns a reader for {@link StateMetaInfoSnapshot} with the requested state type and version number.
    +	 *
    +	 * @param readVersion the format version to read.
    +	 * @param stateTypeHint a hint about the expected type to read.
    +	 * @return the requested reader.
    +	 */
    +	@Nonnull
    +	public static StateMetaInfoReader getReader(int readVersion, @Nonnull StateTypeHint stateTypeHint) {
    +
    +		if (readVersion == CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
    +			// latest version shortcut
    +			return CurrentReaderImpl.INSTANCE;
    +		}
    +
    +		if (readVersion > CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
    +			throw new IllegalArgumentException("Unsupported read version for state meta info: " + readVersion);
    +		}
    +
    +		switch (stateTypeHint) {
    --- End diff --
    
    👍 


---

[GitHub] flink issue #6308: [FLINK-9799] Generalize and unify state meta info snapsho...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/6308
  
    @sihuazhou @azagrebin thanks guys for the fast reviews! Will address the comments and merge.


---

[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...

Posted by azagrebin <gi...@git.apache.org>.
Github user azagrebin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6308#discussion_r201798531
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java ---
    @@ -62,62 +67,62 @@ public int getVersion() {
     
     	@Override
     	public int[] getCompatibleVersions() {
    -		// we are compatible with version 3 (Flink 1.5.x), 2 (Flink 1.4.x, Flink 1.3.x) and version 1 (Flink 1.2.x)
    --- End diff --
    
    was this comment redundant?


---