You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/09/07 16:24:20 UTC

[jira] [Commented] (FLINK-3755) Introduce key groups for key-value state to support dynamic scaling

    [ https://issues.apache.org/jira/browse/FLINK-3755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15471057#comment-15471057 ] 

ASF GitHub Bot commented on FLINK-3755:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2440#discussion_r77856665
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java ---
    @@ -0,0 +1,97 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +public final class KeyGroupRangeAssignment {
    +
    +	public static final int DEFAULT_MAX_PARALLELISM = 128;
    +
    +	private KeyGroupRangeAssignment() {
    +		throw new AssertionError();
    +	}
    +
    +	/**
    +	 * Assigns the given key to a parallel operator index.
    +	 *
    +	 * @param key the key to assign
    +	 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
    +	 * @param parallelism the current parallelism of the operator
    +	 * @return the index of the parallel operator to which the given key should be routed.
    +	 */
    +	public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
    +		return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
    +	}
    +
    +	/**
    +	 * Assigns the given key to a key-group index.
    +	 *
    +	 * @param key the key to assign
    +	 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
    +	 * @return the key-group to which the given key is assigned
    +	 */
    +	public static final int assignToKeyGroup(Object key, int maxParallelism) {
    +		return MathUtils.murmurHash(key.hashCode()) % maxParallelism;
    +	}
    +
    +	/**
    +	 * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum
    +	 * parallelism.
    +	 *
    +	 * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
    +	 * to go beyond this boundary, this method must perform arithmetic on long values.
    +	 *
    +	 * @param maxParallelism Maximal parallelism that the job was initially created with.
    +	 * @param parallelism    The current parallelism under which the job runs. Must be <= maxParallelism.
    +	 * @param operatorIndex  Id of a key-group. 0 <= keyGroupID < maxParallelism.
    +	 * @return
    +	 */
    +	public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
    +			int maxParallelism,
    +			int parallelism,
    +			int operatorIndex) {
    +		Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero.");
    +		Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism.");
    +		Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15.");
    +
    +		int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1;
    +		int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
    --- End diff --
    
    Can't we simplify this expression to
    ```
    int start = (operatorIndex * maxParallelism) / parallelism;
    int end = ((operatorIndex + 1) * maxParallelism) / parallelism - 1;
    ```


> Introduce key groups for key-value state to support dynamic scaling
> -------------------------------------------------------------------
>
>                 Key: FLINK-3755
>                 URL: https://issues.apache.org/jira/browse/FLINK-3755
>             Project: Flink
>          Issue Type: New Feature
>    Affects Versions: 1.1.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>             Fix For: 1.2.0
>
>
> In order to support dynamic scaling, it is necessary to sub-partition the key-value states of each operator. This sub-partitioning, which produces a set of key groups, allows to easily scale in and out Flink jobs by simply reassigning the different key groups to the new set of sub tasks. The idea of key groups is described in this design document [1]. 
> [1] https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)