You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2016/09/07 16:23:30 UTC

[GitHub] flink pull request #2440: [FLINK-3755] Introduce key groups for key-value st...

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2440#discussion_r77856665
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java ---
    @@ -0,0 +1,97 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state;
    +
    +import org.apache.flink.util.MathUtils;
    +import org.apache.flink.util.Preconditions;
    +
    +public final class KeyGroupRangeAssignment {
    +
    +	public static final int DEFAULT_MAX_PARALLELISM = 128;
    +
    +	private KeyGroupRangeAssignment() {
    +		throw new AssertionError();
    +	}
    +
    +	/**
    +	 * Assigns the given key to a parallel operator index.
    +	 *
    +	 * @param key the key to assign
    +	 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
    +	 * @param parallelism the current parallelism of the operator
    +	 * @return the index of the parallel operator to which the given key should be routed.
    +	 */
    +	public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
    +		return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
    +	}
    +
    +	/**
    +	 * Assigns the given key to a key-group index.
    +	 *
    +	 * @param key the key to assign
    +	 * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
    +	 * @return the key-group to which the given key is assigned
    +	 */
    +	public static final int assignToKeyGroup(Object key, int maxParallelism) {
    +		return MathUtils.murmurHash(key.hashCode()) % maxParallelism;
    +	}
    +
    +	/**
    +	 * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum
    +	 * parallelism.
    +	 *
    +	 * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
    +	 * to go beyond this boundary, this method must perform arithmetic on long values.
    +	 *
    +	 * @param maxParallelism Maximal parallelism that the job was initially created with.
    +	 * @param parallelism    The current parallelism under which the job runs. Must be <= maxParallelism.
    +	 * @param operatorIndex  Id of a key-group. 0 <= keyGroupID < maxParallelism.
    +	 * @return
    +	 */
    +	public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
    +			int maxParallelism,
    +			int parallelism,
    +			int operatorIndex) {
    +		Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero.");
    +		Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism.");
    +		Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15.");
    +
    +		int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1;
    +		int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
    --- End diff --
    
    Can't we simplify this expression to
    ```
    int start = (operatorIndex * maxParallelism) / parallelism;
    int end = ((operatorIndex + 1) * maxParallelism) / parallelism - 1;
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---