You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by StefanRRichter <gi...@git.apache.org> on 2016/08/30 13:34:59 UTC

[GitHub] flink pull request #2440: Keyed backend refactor

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/2440

    Keyed backend refactor

    This pull request is a followup to the preliminary pull request #2376 and addresses all issues subsumed under [FLINK-3755]
    
    In addition to the changes from PR #2376, this PR adds:
    
    # 1) Refactoring of Key Value State
    
    Before, `AbstractStateBackend` was responsible both for checkpointing to streams and
    for keyed state. Now, this functionality is split up into `CheckpointStreamFactory` and
    `KeyedStateBackend`. The former is responsible for providing streams for writing checkpoints
    while the latter is only responsible for keeping keyed state. A `KeyedStateBackend` can
    write its content to a checkpoint. For this it uses a `CheckpointStreamFactory`.
    
    # 2) Introduction of key-group aware `KeyedStateBackend`s
    
    ## a) HeapKeyedStateBackend
    `HeapKeyedStateBackend` subsumes the keyed state part of both `MemStateBackend` and 
    `FsStateBackend` and `MemoryStateBackend`. The only difference between the two now
    is that one produces a `CheckpointStreamFactory` that produces streams for writing to files
    while the other provides streams that write to in-memory byte arrays.
    
    Also, this introduces another layer of lookup in the `HeapKeyedStateBackend` to accomodate
    storing state per key group. Upon checkpointing the data is written out in a format that
    is very similar to the new RocksDB backend. We should make these 100 % compatible as
    a follow up.
    
    ## b) RocksDBKeyedStateBackend
    
    The RocksDB state backend is now also key-group aware. This happens through an additional 
    1-2 byte key-group prefix that is added to each key. On snapshots, the key value states 
    for different key-groups are combined through `RocksDBMergeIterator`. All snapshots from
    this backend are now running fully asynchronous using an implimentation of
    `AbstractAsyncIOCallable`.
    
    # Refactoring of asynchrounous snapshot facilities
    
    Snapshots are now driven by `AsyncCheckpointRunnable`s in `StreamTask`, which are executed 
    through a threadpool. `AsyncCheckpointRunnable` is created with a 
    `RunnableFuture<KeyGroupsStateHandle>` that is obtained from `KeyedStateBackend` through
    
    ```
    public abstract RunnableFuture<KeyGroupsStateHandle> snapshot(
    		long checkpointId,
    		long timestamp,
    		CheckpointStreamFactory streamFactory) throws Exception;
    ``` 
    
    # Review comments on #2376
    
    From the comments on this PR, which can be found under PR #2376, we introduced the following changes:
    
    ## In comparison to PR #2376, we dropped the KeyGroupAssigner interface in favor of static methods. 
    The reason for this is that the code relies on a consistent key group assignment in several places.
    ## By default, the max parallelism is chosen as 
    ```
    Math.min(128 , roundToNextPowerOfTwo(parallelism + parallelism / 2))
    ```
    ## No blocking on the termination of async snapshot threads.
    ## Reduced logging.
    
    # Limitiations
    
    Currently, queryable state is not key-group aware and `QueryableStateITCase` is ignored. This will 
    be solved in a folloup work.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink keyed-backend-refactor

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2440.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2440
    
----
commit 40484f3a66558b40bcf5bcaae3e3dba28d73f8dd
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-07-28T13:08:24Z

    [FLINK-4380] Introduce KeyGroupAssigner and Max-Parallelism Parameter
    
    This introduces a new KeySelector that assigns keys to key groups and
    also adds the max parallelism parameter throughout all API levels.
    
    This also adds tests for the newly introduced features.

commit 3609f29076dd504ab9790d874fe3e3d4828f6b77
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-08-10T16:44:50Z

    [FLINK-3761] Refactor State Backends/Make Keyed State Key-Group Aware
    
    The biggest change in this is that functionality that used to be in
    AbstractStateBackend is now moved to CheckpointStreamFactory and
    KeyedStateBackend. The former is responsible for providing streams that
    can be used to checkpoint data while the latter is responsible for
    keeping keyed state. A keyed backend can checkpoint the state that it
    keeps by using a CheckpointStreamFactory.
    
    This also refactors how asynchronous keyed state snapshots work. They
    are not implemented using a Future/RunnableFuture.
    
    Also, this changes the keyed state backends to be key-group aware and to
    snapshot the state in key-groups with an index for restoring.

commit 9d675ca0707b31923108ea78908b63fc46798c97
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-08-11T09:59:07Z

    [FLINK-4381] Refactor State to Prepare For Key-Group State Backends

commit d99b75c70bb15fe6ee5c06968d92b075e9b6c772
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-08-11T10:14:18Z

    [FLINK-4380] Add tests for new Key-Group/Max-Parallelism
    
    This tests the rescaling features in CheckpointCoordinator and
    SavepointCoordinator.

commit 1d514b8d5a8db663e1be293b9653c42d45787e36
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-08-17T12:50:18Z

    [FLINK-3761] Refactor RocksDB Backend/Make Key-Group Aware
    
    This change makes the RocksDB backend key-group aware by building on the
    changes in the previous commit.

commit 4f791d17f727a2fead12224652747b73d98ffaa1
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2016-08-25T12:09:12Z

    Ignore QueryableStateITCase
    
    This doesn't work yet because the state query machinery is not yet
    properly aware of key-grouped state.

commit f47af43bd7b5e54086ab2ce87b9471cb99a38421
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-08-29T08:02:31Z

    Introduced timout of thread pool for testing. Removed legacy code path from HashKeyGroupAssigner

commit 52061346e39cbc591e79bad2b6a4d9ce272bb558
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-08-29T09:53:22Z

    Stephan's feedback: remove KeyGroupAssigner in favor of a static method and have default max. parallelism at 128

commit d77475a69dfdfae092107562af5c96af8de370cb
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-08-29T14:10:15Z

    Improved test stability

commit 1252a246bee6bc181b2def83966d121b1ca5688e
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-08-30T10:26:48Z

    Improved HeapKeyedStateBackend for more compact snapshots.

commit 3f282619c6dbf48a246bf848be2e92a4cad8bd2d
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-08-30T11:45:40Z

    Expose max parallelism through StreamExecutionEnvironment

commit 1fa0e02f33722029159b39625127758fcb3623d3
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-08-30T12:47:34Z

    test fix

commit 257992bf3ead38d12775b077478b84f8690c7fb9
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-08-30T12:59:12Z

    Extended EventTimeWindowCheckpointITCase to test the boundaries of maxParallelism.

commit ea26c0f2e9f1687b5ae89a1acaeaea681c19bd80
Author: Stefan Richter <s....@data-artisans.com>
Date:   2016-08-30T13:27:25Z

    Reset WindowCheckpointingITCase to (sometimes) failing version.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2440: [FLINK-3755] Introduce key groups for key-value st...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2440#discussion_r77956781
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java ---
    @@ -0,0 +1,97 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +public final class KeyGroupRangeAssignment {
    +
    +	public static final int DEFAULT_MAX_PARALLELISM = 128;
    +
    +	private KeyGroupRangeAssignment() {
    +		throw new AssertionError();
    +	}
    +
    +	/**
    +	 * Assigns the given key to a parallel operator index.
    +	 *
    +	 * @param key the key to assign
    +	 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
    +	 * @param parallelism the current parallelism of the operator
    +	 * @return the index of the parallel operator to which the given key should be routed.
    +	 */
    +	public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
    +		return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
    +	}
    +
    +	/**
    +	 * Assigns the given key to a key-group index.
    +	 *
    +	 * @param key the key to assign
    +	 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
    +	 * @return the key-group to which the given key is assigned
    +	 */
    +	public static final int assignToKeyGroup(Object key, int maxParallelism) {
    +		return MathUtils.murmurHash(key.hashCode()) % maxParallelism;
    +	}
    +
    +	/**
    +	 * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum
    +	 * parallelism.
    +	 *
    +	 * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
    +	 * to go beyond this boundary, this method must perform arithmetic on long values.
    +	 *
    +	 * @param maxParallelism Maximal parallelism that the job was initially created with.
    +	 * @param parallelism    The current parallelism under which the job runs. Must be <= maxParallelism.
    +	 * @param operatorIndex  Id of a key-group. 0 <= keyGroupID < maxParallelism.
    +	 * @return
    +	 */
    +	public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
    +			int maxParallelism,
    +			int parallelism,
    +			int operatorIndex) {
    +		Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero.");
    +		Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism.");
    +		Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15.");
    +
    +		int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1;
    +		int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
    --- End diff --
    
    Ah I see. Now it makes sense :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2440: [FLINK-3755] Introduce key groups for key-value st...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2440#discussion_r77856665
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java ---
    @@ -0,0 +1,97 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +public final class KeyGroupRangeAssignment {
    +
    +	public static final int DEFAULT_MAX_PARALLELISM = 128;
    +
    +	private KeyGroupRangeAssignment() {
    +		throw new AssertionError();
    +	}
    +
    +	/**
    +	 * Assigns the given key to a parallel operator index.
    +	 *
    +	 * @param key the key to assign
    +	 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
    +	 * @param parallelism the current parallelism of the operator
    +	 * @return the index of the parallel operator to which the given key should be routed.
    +	 */
    +	public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
    +		return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
    +	}
    +
    +	/**
    +	 * Assigns the given key to a key-group index.
    +	 *
    +	 * @param key the key to assign
    +	 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
    +	 * @return the key-group to which the given key is assigned
    +	 */
    +	public static final int assignToKeyGroup(Object key, int maxParallelism) {
    +		return MathUtils.murmurHash(key.hashCode()) % maxParallelism;
    +	}
    +
    +	/**
    +	 * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum
    +	 * parallelism.
    +	 *
    +	 * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
    +	 * to go beyond this boundary, this method must perform arithmetic on long values.
    +	 *
    +	 * @param maxParallelism Maximal parallelism that the job was initially created with.
    +	 * @param parallelism    The current parallelism under which the job runs. Must be <= maxParallelism.
    +	 * @param operatorIndex  Id of a key-group. 0 <= keyGroupID < maxParallelism.
    +	 * @return
    +	 */
    +	public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
    +			int maxParallelism,
    +			int parallelism,
    +			int operatorIndex) {
    +		Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero.");
    +		Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism.");
    +		Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15.");
    +
    +		int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1;
    +		int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
    --- End diff --
    
    Can't we simplify this expression to
    ```
    int start = (operatorIndex * maxParallelism) / parallelism;
    int end = ((operatorIndex + 1) * maxParallelism) / parallelism - 1;
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2440: [FLINK-3755] Introduce key groups for key-value st...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter closed the pull request at:

    https://github.com/apache/flink/pull/2440


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2440: [FLINK-3755] Introduce key groups for key-value st...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2440#discussion_r77870173
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java ---
    @@ -0,0 +1,97 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +public final class KeyGroupRangeAssignment {
    +
    +	public static final int DEFAULT_MAX_PARALLELISM = 128;
    +
    +	private KeyGroupRangeAssignment() {
    +		throw new AssertionError();
    +	}
    +
    +	/**
    +	 * Assigns the given key to a parallel operator index.
    +	 *
    +	 * @param key the key to assign
    +	 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
    +	 * @param parallelism the current parallelism of the operator
    +	 * @return the index of the parallel operator to which the given key should be routed.
    +	 */
    +	public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
    +		return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
    +	}
    +
    +	/**
    +	 * Assigns the given key to a key-group index.
    +	 *
    +	 * @param key the key to assign
    +	 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
    +	 * @return the key-group to which the given key is assigned
    +	 */
    +	public static final int assignToKeyGroup(Object key, int maxParallelism) {
    +		return MathUtils.murmurHash(key.hashCode()) % maxParallelism;
    +	}
    +
    +	/**
    +	 * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum
    +	 * parallelism.
    +	 *
    +	 * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
    +	 * to go beyond this boundary, this method must perform arithmetic on long values.
    +	 *
    +	 * @param maxParallelism Maximal parallelism that the job was initially created with.
    +	 * @param parallelism    The current parallelism under which the job runs. Must be <= maxParallelism.
    +	 * @param operatorIndex  Id of a key-group. 0 <= keyGroupID < maxParallelism.
    +	 * @return
    +	 */
    +	public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
    +			int maxParallelism,
    +			int parallelism,
    +			int operatorIndex) {
    +		Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero.");
    +		Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism.");
    +		Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15.");
    +
    +		int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1;
    +		int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
    --- End diff --
    
    I don't think that this is giving us the correct inverse for computeOperatorIndexForKeyGroup(...). Our test CheckpointCoordinatorTest::testCreateKeyGroupPartitions()generates counter-examples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2440: [FLINK-3755] Introduce key groups for key-value state to ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/2440
  
    I don't think that this is giving us the correct inverse for ```computeOperatorIndexForKeyGroup(...)```. Our test ```CheckpointCoordinatorTest::testCreateKeyGroupPartitions()```generates counter-examples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---