You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/08/31 17:28:40 UTC
[22/27] flink git commit: [FLINK-4380] Remove KeyGroupAssigner in
favor of static method/Have default max. parallelism at 128
[FLINK-4380] Remove KeyGroupAssigner in favor of static method/Have default max. parallelism at 128
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d430618
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d430618
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d430618
Branch: refs/heads/master
Commit: 6d4306186e09be6f2557600ed7a853c33d3ae6bd
Parents: 2b7a8d6
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Aug 29 11:53:22 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Aug 31 19:10:01 2016 +0200
----------------------------------------------------------------------
.../streaming/state/AbstractRocksDBState.java | 3 +-
.../state/RocksDBKeyedStateBackend.java | 9 +-
.../streaming/state/RocksDBStateBackend.java | 9 +-
.../state/RocksDBStateBackendConfigTest.java | 9 +-
.../api/common/state/KeyGroupAssigner.java | 53 -----
.../java/org/apache/flink/util/MathUtils.java | 15 ++
.../org/apache/flink/util/MathUtilTest.java | 31 +++
.../checkpoint/CheckpointCoordinator.java | 3 +-
.../flink/runtime/jobgraph/JobVertex.java | 3 +-
.../runtime/state/AbstractStateBackend.java | 5 +-
.../runtime/state/HashKeyGroupAssigner.java | 66 ------
.../flink/runtime/state/KeyGroupRange.java | 42 ----
.../runtime/state/KeyGroupRangeAssignment.java | 97 +++++++++
.../flink/runtime/state/KeyedStateBackend.java | 17 +-
.../state/filesystem/FsStateBackend.java | 9 +-
.../runtime/state/heap/AbstractHeapState.java | 4 +-
.../state/heap/HeapKeyedStateBackend.java | 9 +-
.../flink/runtime/state/heap/HeapListState.java | 3 +-
.../state/memory/MemoryStateBackend.java | 9 +-
.../checkpoint/CheckpointCoordinatorTest.java | 3 +-
.../runtime/query/QueryableStateClientTest.java | 4 +-
.../runtime/query/netty/KvStateClientTest.java | 5 +-
.../query/netty/KvStateServerHandlerTest.java | 16 +-
.../runtime/query/netty/KvStateServerTest.java | 6 +-
.../runtime/state/StateBackendTestBase.java | 49 ++---
.../streaming/api/datastream/KeyedStream.java | 5 +-
.../flink/streaming/api/graph/StreamConfig.java | 22 +-
.../api/graph/StreamGraphGenerator.java | 24 ++-
.../api/graph/StreamingJobGraphGenerator.java | 9 +-
.../api/operators/AbstractStreamOperator.java | 5 +-
.../partitioner/KeyGroupStreamPartitioner.java | 24 +--
.../streaming/runtime/tasks/StreamTask.java | 7 +-
.../api/graph/StreamGraphGeneratorTest.java | 26 +--
.../graph/StreamingJobGraphGeneratorTest.java | 200 -------------------
.../operators/StreamingRuntimeContextTest.java | 5 +-
...AlignedProcessingTimeWindowOperatorTest.java | 19 +-
.../KeyGroupStreamPartitionerTest.java | 3 +-
.../tasks/OneInputStreamTaskTestHarness.java | 3 +-
.../KeyedOneInputStreamOperatorTestHarness.java | 16 +-
.../util/OneInputStreamOperatorTestHarness.java | 4 -
.../test/checkpointing/RescalingITCase.java | 27 +--
.../streaming/runtime/StateBackendITCase.java | 5 +-
42 files changed, 297 insertions(+), 586 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index cbc2757..e878ad5 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KvState;
import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyHandle;
@@ -130,7 +131,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta
backend.getKeySerializer(),
namespaceSerializer);
- int keyGroup = backend.getKeyGroupAssigner().getKeyGroupIndex(des.f0);
+ int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups());
writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
return backend.db.get(columnFamily, keySerializationStream.toByteArray());
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index a1634b2..177c09f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -21,7 +21,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
@@ -131,11 +130,11 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
ColumnFamilyOptions columnFamilyOptions,
TaskKvStateRegistry kvStateRegistry,
TypeSerializer<K> keySerializer,
- KeyGroupAssigner<K> keyGroupAssigner,
+ int numberOfKeyGroups,
KeyGroupRange keyGroupRange
) throws Exception {
- super(kvStateRegistry, keySerializer, keyGroupAssigner, keyGroupRange);
+ super(kvStateRegistry, keySerializer, numberOfKeyGroups, keyGroupRange);
this.operatorIdentifier = operatorIdentifier;
this.jobId = jobId;
@@ -183,7 +182,7 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
ColumnFamilyOptions columnFamilyOptions,
TaskKvStateRegistry kvStateRegistry,
TypeSerializer<K> keySerializer,
- KeyGroupAssigner<K> keyGroupAssigner,
+ int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
List<KeyGroupsStateHandle> restoreState
) throws Exception {
@@ -195,7 +194,7 @@ public class RocksDBKeyedStateBackend<K> extends KeyedStateBackend<K> {
columnFamilyOptions,
kvStateRegistry,
keySerializer,
- keyGroupAssigner,
+ numberOfKeyGroups,
keyGroupRange);
LOG.info("Initializing RocksDB keyed state backend from snapshot.");
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index f950751..0fdbd5f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -18,7 +18,6 @@
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
import org.apache.flink.api.common.state.StateBackend;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.Path;
@@ -230,7 +229,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
- KeyGroupAssigner<K> keyGroupAssigner,
+ int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry) throws Exception {
@@ -246,7 +245,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
getColumnOptions(),
kvStateRegistry,
keySerializer,
- keyGroupAssigner,
+ numberOfKeyGroups,
keyGroupRange);
}
@@ -254,7 +253,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
public <K> KeyedStateBackend<K> restoreKeyedStateBackend(Environment env, JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
- KeyGroupAssigner<K> keyGroupAssigner,
+ int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
List<KeyGroupsStateHandle> restoredState,
TaskKvStateRegistry kvStateRegistry) throws Exception {
@@ -270,7 +269,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
getColumnOptions(),
kvStateRegistry,
keySerializer,
- keyGroupAssigner,
+ numberOfKeyGroups,
keyGroupRange,
restoredState);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index acf6cb8..3b851be 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.util.OperatingSystem;
import org.junit.Assume;
@@ -93,7 +92,7 @@ public class RocksDBStateBackendConfigTest {
env.getJobID(),
"test_op",
IntSerializer.INSTANCE,
- new HashKeyGroupAssigner<Integer>(1),
+ 1,
new KeyGroupRange(0, 0),
env.getTaskKvStateRegistry());
@@ -147,7 +146,7 @@ public class RocksDBStateBackendConfigTest {
env.getJobID(),
"test_op",
IntSerializer.INSTANCE,
- new HashKeyGroupAssigner<Integer>(1),
+ 1,
new KeyGroupRange(0, 0),
env.getTaskKvStateRegistry());
@@ -182,7 +181,7 @@ public class RocksDBStateBackendConfigTest {
env.getJobID(),
"foobar",
IntSerializer.INSTANCE,
- new HashKeyGroupAssigner<Integer>(1),
+ 1,
new KeyGroupRange(0, 0),
new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
}
@@ -224,7 +223,7 @@ public class RocksDBStateBackendConfigTest {
env.getJobID(),
"foobar",
IntSerializer.INSTANCE,
- new HashKeyGroupAssigner<Integer>(1),
+ 1,
new KeyGroupRange(0, 0),
new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java
deleted file mode 100644
index bb0691e..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.state;
-
-import org.apache.flink.annotation.Internal;
-
-import java.io.Serializable;
-
-/**
- * Assigns a key to a key group index. A key group is the smallest unit of partitioned state
- * which is assigned to an operator. An operator can be assigned multiple key groups.
- *
- * @param <K> Type of the key
- */
-@Internal
-public interface KeyGroupAssigner<K> extends Serializable {
- /**
- * Calculates the key group index for the given key.
- *
- * @param key Key to be used
- * @return Key group index for the given key
- */
- int getKeyGroupIndex(K key);
-
- /**
- * Setups the key group assigner with the maximum parallelism (= number of key groups).
- *
- * @param numberOfKeygroups Maximum parallelism (= number of key groups)
- */
- void setup(int numberOfKeygroups);
-
- /**
- *
- * @return configured maximum parallelism
- */
- int getNumberKeyGroups();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
index f40c83a..49056cc 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MathUtils.java
@@ -155,6 +155,21 @@ public final class MathUtils {
}
}
+ /**
+ * Round the given number to the next power of two
+ * @param x number to round
+ * @return x rounded up to the next power of two
+ */
+ public static int roundUpToPowerOfTwo(int x) {
+ x = x - 1;
+ x |= x >> 1;
+ x |= x >> 2;
+ x |= x >> 4;
+ x |= x >> 8;
+ x |= x >> 16;
+ return x + 1;
+ }
+
// ============================================================================================
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java b/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
index 7917a7b..c98b7fc 100644
--- a/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java
@@ -81,6 +81,37 @@ public class MathUtilTest {
}
@Test
+ public void testRoundUpToPowerOf2() {
+ assertEquals(0, MathUtils.roundUpToPowerOfTwo(0));
+ assertEquals(1, MathUtils.roundUpToPowerOfTwo(1));
+ assertEquals(2, MathUtils.roundUpToPowerOfTwo(2));
+ assertEquals(4, MathUtils.roundUpToPowerOfTwo(3));
+ assertEquals(4, MathUtils.roundUpToPowerOfTwo(4));
+ assertEquals(8, MathUtils.roundUpToPowerOfTwo(5));
+ assertEquals(8, MathUtils.roundUpToPowerOfTwo(6));
+ assertEquals(8, MathUtils.roundUpToPowerOfTwo(7));
+ assertEquals(8, MathUtils.roundUpToPowerOfTwo(8));
+ assertEquals(16, MathUtils.roundUpToPowerOfTwo(9));
+ assertEquals(16, MathUtils.roundUpToPowerOfTwo(15));
+ assertEquals(16, MathUtils.roundUpToPowerOfTwo(16));
+ assertEquals(32, MathUtils.roundUpToPowerOfTwo(17));
+ assertEquals(32, MathUtils.roundUpToPowerOfTwo(31));
+ assertEquals(32, MathUtils.roundUpToPowerOfTwo(32));
+ assertEquals(64, MathUtils.roundUpToPowerOfTwo(33));
+ assertEquals(64, MathUtils.roundUpToPowerOfTwo(42));
+ assertEquals(64, MathUtils.roundUpToPowerOfTwo(63));
+ assertEquals(64, MathUtils.roundUpToPowerOfTwo(64));
+ assertEquals(128, MathUtils.roundUpToPowerOfTwo(125));
+ assertEquals(32768, MathUtils.roundUpToPowerOfTwo(25654));
+ assertEquals(67108864, MathUtils.roundUpToPowerOfTwo(34366363));
+ assertEquals(67108864, MathUtils.roundUpToPowerOfTwo(67108863));
+ assertEquals(67108864, MathUtils.roundUpToPowerOfTwo(67108864));
+ assertEquals(0x40000000, MathUtils.roundUpToPowerOfTwo(0x3FFFFFFE));
+ assertEquals(0x40000000, MathUtils.roundUpToPowerOfTwo(0x3FFFFFFF));
+ assertEquals(0x40000000, MathUtils.roundUpToPowerOfTwo(0x40000000));
+ }
+
+ @Test
public void testPowerOfTwo() {
assertTrue(MathUtils.isPowerOf2(1));
assertTrue(MathUtils.isPowerOf2(2));
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e751e08..52f6d9a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Preconditions;
@@ -886,7 +887,7 @@ public class CheckpointCoordinator {
List<KeyGroupRange> result = new ArrayList<>(parallelism);
int start = 0;
for (int i = 0; i < parallelism; ++i) {
- result.add(KeyGroupRange.computeKeyGroupRangeForOperatorIndex(numberKeyGroups, parallelism, i));
+ result.add(KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(numberKeyGroups, parallelism, i));
}
return result;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index a623295..8ddc9f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -253,7 +253,8 @@ public class JobVertex implements java.io.Serializable {
*/
public void setMaxParallelism(int maxParallelism) {
org.apache.flink.util.Preconditions.checkArgument(
- maxParallelism > 0 && maxParallelism <= Short.MAX_VALUE, "The max parallelism must be at least 1.");
+ maxParallelism > 0 && maxParallelism <= (1 << 15),
+ "The max parallelism must be at least 1 and smaller than 2^15.");
this.maxParallelism = maxParallelism;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index e6093a8..0d2bf45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -53,7 +52,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
- KeyGroupAssigner<K> keyGroupAssigner,
+ int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry) throws Exception;
@@ -66,7 +65,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
- KeyGroupAssigner<K> keyGroupAssigner,
+ int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
List<KeyGroupsStateHandle> restoredState,
TaskKvStateRegistry kvStateRegistry) throws Exception;
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
deleted file mode 100644
index 9ee4b90..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.state.KeyGroupAssigner;
-import org.apache.flink.util.MathUtils;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Hash based key group assigner. The assigner assigns each key to a key group using the hash value
- * of the key.
- *
- * @param <K> Type of the key
- */
-public class HashKeyGroupAssigner<K> implements KeyGroupAssigner<K> {
- private static final long serialVersionUID = -6319826921798945448L;
-
- private static final int UNDEFINED_NUMBER_KEY_GROUPS = Integer.MIN_VALUE;
-
- private int numberKeyGroups;
-
- public HashKeyGroupAssigner() {
- this(UNDEFINED_NUMBER_KEY_GROUPS);
- }
-
- public HashKeyGroupAssigner(int numberKeyGroups) {
- Preconditions.checkArgument(numberKeyGroups > 0 || numberKeyGroups == UNDEFINED_NUMBER_KEY_GROUPS,
- "The number of key groups has to be greater than 0 or undefined. Use " +
- "setMaxParallelism() to specify the number of key groups.");
- this.numberKeyGroups = numberKeyGroups;
- }
-
- public int getNumberKeyGroups() {
- return numberKeyGroups;
- }
-
- @Override
- public int getKeyGroupIndex(K key) {
- return MathUtils.murmurHash(key.hashCode()) % numberKeyGroups;
- }
-
- @Override
- public void setup(int numberOfKeygroups) {
- Preconditions.checkArgument(numberOfKeygroups > 0, "The number of key groups has to be " +
- "greater than 0. Use setMaxParallelism() to specify the number of key " +
- "groups.");
-
- this.numberKeyGroups = numberOfKeygroups;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
index 9e74036..3a9d3d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
@@ -175,46 +175,4 @@ public class KeyGroupRange implements Iterable<Integer>, Serializable {
return startKeyGroup <= endKeyGroup ? new KeyGroupRange(startKeyGroup, endKeyGroup) : EMPTY_KEY_GROUP;
}
- /**
- * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum
- * parallelism.
- *
- * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
- * to go beyond this boundary, this method must perform arithmetic on long values.
- *
- * @param maxParallelism Maximal parallelism that the job was initially created with.
- * @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism.
- * @param operatorIndex Id of a key-group. 0 <= keyGroupID < maxParallelism.
- * @return
- */
- public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
- int maxParallelism,
- int parallelism,
- int operatorIndex) {
- Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero.");
- Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism.");
- Preconditions.checkArgument(maxParallelism <= Short.MAX_VALUE, "Maximum parallelism must be smaller than Short.MAX_VALUE.");
-
- int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1;
- int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
- return new KeyGroupRange(start, end);
- }
-
- /**
- * Computes the index of the operator to which a key-group belongs under the given parallelism and maximum
- * parallelism.
- *
- * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
- * to go beyond this boundary, this method must perform arithmetic on long values.
- *
- * @param maxParallelism Maximal parallelism that the job was initially created with.
- * 0 < parallelism <= maxParallelism <= Short.MAX_VALUE must hold.
- * @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism.
- * @param keyGroupId Id of a key-group. 0 <= keyGroupID < maxParallelism.
- * @return The index of the operator to which elements from the given key-group should be routed under the given
- * parallelism and maxParallelism.
- */
- public static final int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
- return keyGroupId * parallelism / maxParallelism;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
new file mode 100644
index 0000000..eceb6f4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+
+public final class KeyGroupRangeAssignment {
+
+ public static final int DEFAULT_MAX_PARALLELISM = 128;
+
+ private KeyGroupRangeAssignment() {
+ throw new AssertionError();
+ }
+
+ /**
+ * Assigns the given key to a parallel operator index.
+ *
+ * @param key the key to assign
+ * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
+ * @param parallelism the current parallelism of the operator
+ * @return the index of the parallel operator to which the given key should be routed.
+ */
+ public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) {
+ return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism));
+ }
+
+ /**
+ * Assigns the given key to a key-group index.
+ *
+ * @param key the key to assign
+ * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
+ * @return the key-group to which the given key is assigned
+ */
+ public static final int assignToKeyGroup(Object key, int maxParallelism) {
+ return MathUtils.murmurHash(key.hashCode()) % maxParallelism;
+ }
+
+ /**
+ * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum
+ * parallelism.
+ *
+ * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
+ * to go beyond this boundary, this method must perform arithmetic on long values.
+ *
+ * @param maxParallelism Maximal parallelism that the job was initially created with.
+ * @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism.
+ * @param operatorIndex Id of a key-group. 0 <= keyGroupID < maxParallelism.
+ * @return
+ */
+ public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
+ int maxParallelism,
+ int parallelism,
+ int operatorIndex) {
+ Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero.");
+ Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism.");
+ Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15.");
+
+ int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1;
+ int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
+ return new KeyGroupRange(start, end);
+ }
+
+ /**
+ * Computes the index of the operator to which a key-group belongs under the given parallelism and maximum
+ * parallelism.
+ *
+ * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
+ * to go beyond this boundary, this method must perform arithmetic on long values.
+ *
+ * @param maxParallelism Maximal parallelism that the job was initially created with.
+ * 0 < parallelism <= maxParallelism <= Short.MAX_VALUE must hold.
+ * @param parallelism The current parallelism under which the job runs. Must be <= maxParallelism.
+ * @param keyGroupId Id of a key-group. 0 <= keyGroupID < maxParallelism.
+ * @return The index of the operator to which elements from the given key-group should be routed under the given
+ * parallelism and maxParallelism.
+ */
+ public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
+ return keyGroupId * parallelism / maxParallelism;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
index 2d1d25c..bf9018e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MergingState;
@@ -69,8 +68,8 @@ public abstract class KeyedStateBackend<K> {
@SuppressWarnings("rawtypes")
private KvState lastState;
- /** KeyGroupAssigner which determines the key group for each keys */
- protected final KeyGroupAssigner<K> keyGroupAssigner;
+ /** The number of key-groups aka max parallelism */
+ protected final int numberOfKeyGroups;
/** Range of key-groups for which this backend is responsible */
protected final KeyGroupRange keyGroupRange;
@@ -81,12 +80,12 @@ public abstract class KeyedStateBackend<K> {
public KeyedStateBackend(
TaskKvStateRegistry kvStateRegistry,
TypeSerializer<K> keySerializer,
- KeyGroupAssigner<K> keyGroupAssigner,
+ int numberOfKeyGroups,
KeyGroupRange keyGroupRange) {
this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry);
this.keySerializer = Preconditions.checkNotNull(keySerializer);
- this.keyGroupAssigner = Preconditions.checkNotNull(keyGroupAssigner);
+ this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups);
this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
}
@@ -157,7 +156,7 @@ public abstract class KeyedStateBackend<K> {
*/
public void setCurrentKey(K newKey) {
this.currentKey = newKey;
- this.currentKeyGroup = keyGroupAssigner.getKeyGroupIndex(newKey);
+ this.currentKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups);
}
/**
@@ -179,11 +178,7 @@ public abstract class KeyedStateBackend<K> {
}
public int getNumberOfKeyGroups() {
- return keyGroupAssigner.getNumberKeyGroups();
- }
-
- public KeyGroupAssigner<K> getKeyGroupAssigner() {
- return keyGroupAssigner;
+ return numberOfKeyGroups;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 5495244..6d92a4d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.state.filesystem;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
@@ -181,13 +180,13 @@ public class FsStateBackend extends AbstractStateBackend {
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
- KeyGroupAssigner<K> keyGroupAssigner,
+ int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry) throws Exception {
return new HeapKeyedStateBackend<>(
kvStateRegistry,
keySerializer,
- keyGroupAssigner,
+ numberOfKeyGroups,
keyGroupRange);
}
@@ -197,14 +196,14 @@ public class FsStateBackend extends AbstractStateBackend {
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
- KeyGroupAssigner<K> keyGroupAssigner,
+ int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
List<KeyGroupsStateHandle> restoredState,
TaskKvStateRegistry kvStateRegistry) throws Exception {
return new HeapKeyedStateBackend<>(
kvStateRegistry,
keySerializer,
- keyGroupAssigner,
+ numberOfKeyGroups,
keyGroupRange,
restoredState);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
index 9863c93..18d1bc7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapState.java
@@ -24,9 +24,9 @@ import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.util.Preconditions;
import java.util.HashMap;
@@ -138,7 +138,7 @@ public abstract class AbstractHeapState<K, N, SV, S extends State, SD extends St
Preconditions.checkState(key != null, "No key given.");
Map<N, Map<K, SV>> namespaceMap =
- stateTable.get(backend.getKeyGroupAssigner().getKeyGroupIndex(key));
+ stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups()));
if (namespaceMap == null) {
return null;
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index fcb4bef..8d13941 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.state.heap;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
@@ -76,20 +75,20 @@ public class HeapKeyedStateBackend<K> extends KeyedStateBackend<K> {
public HeapKeyedStateBackend(
TaskKvStateRegistry kvStateRegistry,
TypeSerializer<K> keySerializer,
- KeyGroupAssigner<K> keyGroupAssigner,
+ int numberOfKeyGroups,
KeyGroupRange keyGroupRange) {
- super(kvStateRegistry, keySerializer, keyGroupAssigner, keyGroupRange);
+ super(kvStateRegistry, keySerializer, numberOfKeyGroups, keyGroupRange);
LOG.info("Initializing heap keyed state backend with stream factory.");
}
public HeapKeyedStateBackend(TaskKvStateRegistry kvStateRegistry,
TypeSerializer<K> keySerializer,
- KeyGroupAssigner<K> keyGroupAssigner,
+ int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
List<KeyGroupsStateHandle> restoredState) throws Exception {
- super(kvStateRegistry, keySerializer, keyGroupAssigner, keyGroupRange);
+ super(kvStateRegistry, keySerializer, numberOfKeyGroups, keyGroupRange);
LOG.info("Initializing heap keyed state backend from snapshot.");
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
index 4c65c25..9552325 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.util.Preconditions;
@@ -119,7 +120,7 @@ public class HeapListState<K, N, V>
Preconditions.checkState(key != null, "No key given.");
Map<N, Map<K, ArrayList<V>>> namespaceMap =
- stateTable.get(backend.getKeyGroupAssigner().getKeyGroupIndex(key));
+ stateTable.get(KeyGroupRangeAssignment.assignToKeyGroup(key, backend.getNumberOfKeyGroups()));
if (namespaceMap == null) {
return null;
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 654c367..179dfe7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.state.memory;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -80,14 +79,14 @@ public class MemoryStateBackend extends AbstractStateBackend {
public <K> KeyedStateBackend<K> createKeyedStateBackend(
Environment env, JobID jobID,
String operatorIdentifier, TypeSerializer<K> keySerializer,
- KeyGroupAssigner<K> keyGroupAssigner,
+ int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry) throws IOException {
return new HeapKeyedStateBackend<>(
kvStateRegistry,
keySerializer,
- keyGroupAssigner,
+ numberOfKeyGroups,
keyGroupRange);
}
@@ -96,7 +95,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
Environment env, JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
- KeyGroupAssigner<K> keyGroupAssigner,
+ int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
List<KeyGroupsStateHandle> restoredState,
TaskKvStateRegistry kvStateRegistry) throws Exception {
@@ -104,7 +103,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
return new HeapKeyedStateBackend<>(
kvStateRegistry,
keySerializer,
- keyGroupAssigner,
+ numberOfKeyGroups,
keyGroupRange,
restoredState);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 495dced..4972c51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
@@ -2458,7 +2459,7 @@ public class CheckpointCoordinatorTest {
private void testCreateKeyGroupPartitions(int maxParallelism, int parallelism) {
List<KeyGroupRange> ranges = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism, parallelism);
for (int i = 0; i < maxParallelism; ++i) {
- KeyGroupRange range = ranges.get(KeyGroupRange.computeOperatorIndexForKeyGroup(maxParallelism, parallelism, i));
+ KeyGroupRange range = ranges.get(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(maxParallelism, parallelism, i));
if (!range.contains(i)) {
Assert.fail("Could not find expected key-group " + i + " in range " + range);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
index 3380907..405f962 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.query.netty.KvStateClient;
import org.apache.flink.runtime.query.netty.KvStateServer;
import org.apache.flink.runtime.query.netty.UnknownKvStateID;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
@@ -232,6 +231,7 @@ public class QueryableStateClientTest {
// Config
int numServers = 2;
int numKeys = 1024;
+ int numKeyGroups = 1;
JobID jobId = new JobID();
JobVertexID jobVertexId = new JobVertexID();
@@ -250,7 +250,7 @@ public class QueryableStateClientTest {
new JobID(),
"test_op",
IntSerializer.INSTANCE,
- new HashKeyGroupAssigner<Integer>(1),
+ numKeyGroups,
new KeyGroupRange(0, 0),
new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
index 796481c..f785174 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java
@@ -42,7 +42,6 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequest;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KvState;
@@ -533,6 +532,8 @@ public class KvStateClientTest {
final int batchSize = 16;
+ final int numKeyGroups = 1;
+
AbstractStateBackend abstractBackend = new MemoryStateBackend();
KvStateRegistry dummyRegistry = new KvStateRegistry();
DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
@@ -542,7 +543,7 @@ public class KvStateClientTest {
new JobID(),
"test_op",
IntSerializer.INSTANCE,
- new HashKeyGroupAssigner<Integer>(1),
+ numKeyGroups,
new KeyGroupRange(0, 0),
dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID()));
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
index 3d2e8b5..52c807f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java
@@ -39,7 +39,6 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestResult;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KvState;
@@ -89,6 +88,7 @@ public class KvStateServerHandlerTest {
ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE, null);
desc.setQueryable("vanilla");
+ int numKeyGroups =1;
AbstractStateBackend abstractBackend = new MemoryStateBackend();
DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
dummyEnv.setKvStateRegistry(registry);
@@ -97,7 +97,7 @@ public class KvStateServerHandlerTest {
new JobID(),
"test_op",
IntSerializer.INSTANCE,
- new HashKeyGroupAssigner<Integer>(1),
+ numKeyGroups,
new KeyGroupRange(0, 0),
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
@@ -200,6 +200,7 @@ public class KvStateServerHandlerTest {
KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+ int numKeyGroups = 1;
AbstractStateBackend abstractBackend = new MemoryStateBackend();
DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
dummyEnv.setKvStateRegistry(registry);
@@ -208,7 +209,7 @@ public class KvStateServerHandlerTest {
new JobID(),
"test_op",
IntSerializer.INSTANCE,
- new HashKeyGroupAssigner<Integer>(1),
+ numKeyGroups,
new KeyGroupRange(0, 0),
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
@@ -346,6 +347,7 @@ public class KvStateServerHandlerTest {
KvStateServerHandler handler = new KvStateServerHandler(registry, closedExecutor, stats);
EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+ int numKeyGroups = 1;
AbstractStateBackend abstractBackend = new MemoryStateBackend();
DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
dummyEnv.setKvStateRegistry(registry);
@@ -354,7 +356,7 @@ public class KvStateServerHandlerTest {
new JobID(),
"test_op",
IntSerializer.INSTANCE,
- new HashKeyGroupAssigner<Integer>(1),
+ numKeyGroups,
new KeyGroupRange(0, 0),
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
@@ -484,6 +486,7 @@ public class KvStateServerHandlerTest {
KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+ int numKeyGroups = 1;
AbstractStateBackend abstractBackend = new MemoryStateBackend();
DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
dummyEnv.setKvStateRegistry(registry);
@@ -492,7 +495,7 @@ public class KvStateServerHandlerTest {
new JobID(),
"test_op",
IntSerializer.INSTANCE,
- new HashKeyGroupAssigner<Integer>(1),
+ numKeyGroups,
new KeyGroupRange(0, 0),
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
@@ -579,6 +582,7 @@ public class KvStateServerHandlerTest {
KvStateServerHandler handler = new KvStateServerHandler(registry, TEST_THREAD_POOL, stats);
EmbeddedChannel channel = new EmbeddedChannel(getFrameDecoder(), handler);
+ int numKeyGroups = 1;
AbstractStateBackend abstractBackend = new MemoryStateBackend();
DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
dummyEnv.setKvStateRegistry(registry);
@@ -587,7 +591,7 @@ public class KvStateServerHandlerTest {
new JobID(),
"test_op",
IntSerializer.INSTANCE,
- new HashKeyGroupAssigner<Integer>(1),
+ numKeyGroups,
new KeyGroupRange(0, 0),
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
index 30d91b6..e92fb10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerTest.java
@@ -42,7 +42,6 @@ import org.apache.flink.runtime.query.netty.message.KvStateRequestResult;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.query.netty.message.KvStateRequestType;
import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
@@ -80,7 +79,6 @@ public class KvStateServerTest {
public void testSimpleRequest() throws Exception {
KvStateServer server = null;
Bootstrap bootstrap = null;
-
try {
KvStateRegistry registry = new KvStateRegistry();
KvStateRequestStats stats = new AtomicKvStateRequestStats();
@@ -89,7 +87,7 @@ public class KvStateServerTest {
server.start();
KvStateServerAddress serverAddress = server.getAddress();
-
+ int numKeyGroups = 1;
AbstractStateBackend abstractBackend = new MemoryStateBackend();
DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
dummyEnv.setKvStateRegistry(registry);
@@ -98,7 +96,7 @@ public class KvStateServerTest {
new JobID(),
"test_op",
IntSerializer.INSTANCE,
- new HashKeyGroupAssigner<Integer>(1),
+ numKeyGroups,
new KeyGroupRange(0, 0),
registry.createTaskRegistry(new JobID(), new JobVertexID()));
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index f094bd5..5984aca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
@@ -41,7 +40,6 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateRegistry;
@@ -56,7 +54,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import static org.junit.Assert.assertEquals;
@@ -90,14 +87,14 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
protected <K> KeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer, Environment env) throws Exception {
return createKeyedBackend(
keySerializer,
- new HashKeyGroupAssigner<K>(10),
+ 10,
new KeyGroupRange(0, 9),
env);
}
protected <K> KeyedStateBackend<K> createKeyedBackend(
TypeSerializer<K> keySerializer,
- KeyGroupAssigner<K> keyGroupAssigner,
+ int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
Environment env) throws Exception {
return getStateBackend().createKeyedStateBackend(
@@ -105,7 +102,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
new JobID(),
"test_op",
keySerializer,
- keyGroupAssigner,
+ numberOfKeyGroups,
keyGroupRange,
env.getTaskKvStateRegistry());
}
@@ -120,7 +117,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
Environment env) throws Exception {
return restoreKeyedBackend(
keySerializer,
- new HashKeyGroupAssigner<K>(10),
+ 10,
new KeyGroupRange(0, 9),
Collections.singletonList(state),
env);
@@ -128,7 +125,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
protected <K> KeyedStateBackend<K> restoreKeyedBackend(
TypeSerializer<K> keySerializer,
- KeyGroupAssigner<K> keyGroupAssigner,
+ int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
List<KeyGroupsStateHandle> state,
Environment env) throws Exception {
@@ -137,7 +134,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
new JobID(),
"test_op",
keySerializer,
- keyGroupAssigner,
+ numberOfKeyGroups,
keyGroupRange,
state,
env.getTaskKvStateRegistry());
@@ -243,7 +240,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
KeyedStateBackend<Integer> backend = createKeyedBackend(
IntSerializer.INSTANCE,
- new HashKeyGroupAssigner<Integer>(1),
+ 1,
new KeyGroupRange(0, 0),
new DummyEnvironment("test_op", 1, 0));
@@ -277,7 +274,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
backend.close();
backend = restoreKeyedBackend(
IntSerializer.INSTANCE,
- new HashKeyGroupAssigner<Integer>(1),
+ 1,
new KeyGroupRange(0, 0),
Collections.singletonList(snapshot1),
new DummyEnvironment("test_op", 1, 0));
@@ -675,12 +672,9 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
final int MAX_PARALLELISM = 10;
CheckpointStreamFactory streamFactory = createStreamFactory();
-
- HashKeyGroupAssigner<Integer> keyGroupAssigner = new HashKeyGroupAssigner<>(10);
-
KeyedStateBackend<Integer> backend = createKeyedBackend(
IntSerializer.INSTANCE,
- keyGroupAssigner,
+ MAX_PARALLELISM,
new KeyGroupRange(0, MAX_PARALLELISM - 1),
new DummyEnvironment("test", 1, 0));
@@ -695,12 +689,12 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
Random rand = new Random(0);
// for each key, determine into which half of the key-group space they fall
- int firstKeyHalf = keyGroupAssigner.getKeyGroupIndex(keyInFirstHalf) * 2 / MAX_PARALLELISM;
- int secondKeyHalf = keyGroupAssigner.getKeyGroupIndex(keyInSecondHalf) * 2 / MAX_PARALLELISM;
+ int firstKeyHalf = KeyGroupRangeAssignment.assignKeyToParallelOperator(keyInFirstHalf, MAX_PARALLELISM, 2);
+ int secondKeyHalf = KeyGroupRangeAssignment.assignKeyToParallelOperator(keyInFirstHalf, MAX_PARALLELISM, 2);
while (firstKeyHalf == secondKeyHalf) {
keyInSecondHalf = rand.nextInt();
- secondKeyHalf = keyGroupAssigner.getKeyGroupIndex(keyInSecondHalf) * 2 / MAX_PARALLELISM;
+ secondKeyHalf = KeyGroupRangeAssignment.assignKeyToParallelOperator(keyInSecondHalf, MAX_PARALLELISM, 2);
}
backend.setCurrentKey(keyInFirstHalf);
@@ -714,18 +708,18 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
List<KeyGroupsStateHandle> firstHalfKeyGroupStates = CheckpointCoordinator.getKeyGroupsStateHandles(
Collections.singletonList(snapshot),
- new KeyGroupRange(0, 4));
+ KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(MAX_PARALLELISM, 2, 0));
List<KeyGroupsStateHandle> secondHalfKeyGroupStates = CheckpointCoordinator.getKeyGroupsStateHandles(
Collections.singletonList(snapshot),
- new KeyGroupRange(5, 9));
+ KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(MAX_PARALLELISM, 2, 1));
backend.close();
// backend for the first half of the key group range
KeyedStateBackend<Integer> firstHalfBackend = restoreKeyedBackend(
IntSerializer.INSTANCE,
- keyGroupAssigner,
+ MAX_PARALLELISM,
new KeyGroupRange(0, 4),
firstHalfKeyGroupStates,
new DummyEnvironment("test", 1, 0));
@@ -733,7 +727,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
// backend for the second half of the key group range
KeyedStateBackend<Integer> secondHalfBackend = restoreKeyedBackend(
IntSerializer.INSTANCE,
- keyGroupAssigner,
+ MAX_PARALLELISM,
new KeyGroupRange(5, 9),
secondHalfKeyGroupStates,
new DummyEnvironment("test", 1, 0));
@@ -978,9 +972,10 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
*/
@SuppressWarnings("unchecked")
protected void testConcurrentMapIfQueryable() throws Exception {
+ final int numberOfKeyGroups = 1;
KeyedStateBackend<Integer> backend = createKeyedBackend(
IntSerializer.INSTANCE,
- new HashKeyGroupAssigner<Integer>(1),
+ numberOfKeyGroups,
new KeyGroupRange(0, 0),
new DummyEnvironment("test_op", 1, 0));
@@ -1005,7 +1000,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
backend.setCurrentKey(1);
state.update(121818273);
- int keyGroupIndex = new HashKeyGroupAssigner<>(1).getKeyGroupIndex(1);
+ int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
StateTable stateTable = ((AbstractHeapState) kvState).getStateTable();
assertNotNull("State not set", stateTable.get(keyGroupIndex));
assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap);
@@ -1031,7 +1026,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
backend.setCurrentKey(1);
state.add(121818273);
- int keyGroupIndex = new HashKeyGroupAssigner<>(1).getKeyGroupIndex(1);
+ int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
StateTable stateTable = ((AbstractHeapState) kvState).getStateTable();
assertNotNull("State not set", stateTable.get(keyGroupIndex));
assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap);
@@ -1062,7 +1057,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
backend.setCurrentKey(1);
state.add(121818273);
- int keyGroupIndex = new HashKeyGroupAssigner<>(1).getKeyGroupIndex(1);
+ int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
StateTable stateTable = ((AbstractHeapState) kvState).getStateTable();
assertNotNull("State not set", stateTable.get(keyGroupIndex));
assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap);
@@ -1093,7 +1088,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
backend.setCurrentKey(1);
state.add(121818273);
- int keyGroupIndex = new HashKeyGroupAssigner<>(1).getKeyGroupIndex(1);
+ int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup(1, numberOfKeyGroups);
StateTable stateTable = ((AbstractHeapState) kvState).getStateTable();
assertNotNull("State not set", stateTable.get(keyGroupIndex));
assertTrue(stateTable.get(keyGroupIndex) instanceof ConcurrentHashMap);
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 1fb34b8..af907e3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
@@ -111,8 +111,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
new PartitionTransformation<>(
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>(
- keySelector,
- new HashKeyGroupAssigner<KEY>())));
+ keySelector, KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM)));
this.keySelector = keySelector;
this.keyType = keyType;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 1a92ba4..8e807db 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
@@ -76,8 +75,7 @@ public class StreamConfig implements Serializable {
private static final String STATE_BACKEND = "statebackend";
private static final String STATE_PARTITIONER = "statePartitioner";
- /** key for the {@link KeyGroupAssigner} for key to key group index mappings */
- private static final String KEY_GROUP_ASSIGNER = "keyGroupAssigner";
+ private static final String NUMBER_OF_KEY_GROUPS = "numberOfKeyGroups";
private static final String STATE_KEY_SERIALIZER = "statekeyser";
@@ -445,28 +443,26 @@ public class StreamConfig implements Serializable {
}
/**
- * Sets the {@link KeyGroupAssigner} to be used for the current {@link StreamOperator}.
+ * Sets the number of key-groups to be used for the current {@link StreamOperator}.
*
- * @param keyGroupAssigner Key group assigner to be used
+ * @param numberOfKeyGroups Number of key-groups to be used
*/
- public void setKeyGroupAssigner(KeyGroupAssigner<?> keyGroupAssigner) {
+ public void setNumberOfKeyGroups(int numberOfKeyGroups) {
try {
- InstantiationUtil.writeObjectToConfig(keyGroupAssigner, this.config, KEY_GROUP_ASSIGNER);
+ InstantiationUtil.writeObjectToConfig(numberOfKeyGroups, this.config, NUMBER_OF_KEY_GROUPS);
} catch (Exception e) {
throw new StreamTaskException("Could not serialize virtual state partitioner.", e);
}
}
/**
- * Gets the {@link KeyGroupAssigner} for the {@link StreamOperator}.
+ * Gets the number of key-groups for the {@link StreamOperator}.
*
- * @param classLoader Classloader to be used for the deserialization
- * @param <K> Type of the keys to be assigned to key groups
- * @return Key group assigner
+ * @return the number of key-groups
*/
- public <K> KeyGroupAssigner<K> getKeyGroupAssigner(ClassLoader classLoader) {
+ public Integer getNumberOfKeyGroups(ClassLoader cl) {
try {
- return InstantiationUtil.readObjectFromConfig(this.config, KEY_GROUP_ASSIGNER, classLoader);
+ return InstantiationUtil.readObjectFromConfig(this.config, NUMBER_OF_KEY_GROUPS, cl);
} catch (Exception e) {
throw new StreamTaskException("Could not instantiate virtual state partitioner.", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index f9f26e9..506b664 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.graph;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
@@ -33,6 +34,7 @@ import org.apache.flink.streaming.api.transformations.SplitTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
+import org.apache.flink.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -145,24 +147,24 @@ public class StreamGraphGenerator {
LOG.debug("Transforming " + transform);
if (transform.getMaxParallelism() <= 0) {
+
// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from theExecutionConfig. If this value has not been specified either, then use the
// parallelism of the operator.
int maxParallelism = env.getConfig().getMaxParallelism();
if (maxParallelism <= 0) {
- maxParallelism = transform.getParallelism();
- /**
- * TODO: Remove once the parallelism settings works properly in Flink (FLINK-3885)
- * Currently, the parallelism will be set to 1 on the JobManager iff it encounters
- * a negative parallelism value. We need to know this for the
- * KeyGroupStreamPartitioner on the client-side. Thus, we already set the value to
- * 1 here.
- */
- if (maxParallelism <= 0) {
- transform.setParallelism(1);
- maxParallelism = 1;
+
+ int parallelism = transform.getParallelism();
+
+ if(parallelism <= 0) {
+ parallelism = 1;
+ transform.setParallelism(parallelism);
}
+
+ maxParallelism = Math.max(
+ MathUtils.roundUpToPowerOfTwo(parallelism + (parallelism / 2)),
+ KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM);
}
transform.setMaxParallelism(maxParallelism);
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 76fdaca..a024895 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -26,7 +26,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -41,7 +40,6 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
@@ -357,12 +355,9 @@ public class StreamingJobGraphGenerator {
config.setStatePartitioner(1, vertex.getStatePartitioner2());
config.setStateKeySerializer(vertex.getStateKeySerializer());
- // only set the key group assigner if the vertex uses partitioned state (= KeyedStream).
+ // only set the max parallelism if the vertex uses partitioned state (= KeyedStream).
if (vertex.getStatePartitioner1() != null) {
- // the key group assigner has to know the number of key groups (= maxParallelism)
- KeyGroupAssigner<Object> keyGroupAssigner = new HashKeyGroupAssigner<Object>(vertex.getMaxParallelism());
-
- config.setKeyGroupAssigner(keyGroupAssigner);
+ config.setNumberOfKeyGroups(vertex.getMaxParallelism());
}
Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 718c0c7..71296e3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
@@ -133,14 +134,14 @@ public abstract class AbstractStreamOperator<OUT>
if (null != keySerializer) {
ExecutionConfig execConf = container.getEnvironment().getExecutionConfig();;
- KeyGroupRange subTaskKeyGroupRange = KeyGroupRange.computeKeyGroupRangeForOperatorIndex(
+ KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
container.getEnvironment().getTaskInfo().getNumberOfKeyGroups(),
container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(),
container.getIndexInSubtaskGroup());
keyedStateBackend = container.createKeyedStateBackend(
keySerializer,
- container.getConfiguration().getKeyGroupAssigner(getUserCodeClassloader()),
+ container.getConfiguration().getNumberOfKeyGroups(getUserCodeClassloader()),
subTaskKeyGroupRange);
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
index 108a3ae..256fee1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
@@ -18,16 +18,14 @@
package org.apache.flink.streaming.runtime.partitioner;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
/**
- * Partitioner selects the target channel based on the key group index. The key group
- * index is derived from the key of the elements using the {@link KeyGroupAssigner}.
+ * Partitioner selects the target channel based on the key group index.
*
* @param <T> Type of the elements in the Stream being partitioned
*/
@@ -39,15 +37,16 @@ public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implem
private final KeySelector<T, K> keySelector;
- private final KeyGroupAssigner<K> keyGroupAssigner;
+ private int maxParallelism;
- public KeyGroupStreamPartitioner(KeySelector<T, K> keySelector, KeyGroupAssigner<K> keyGroupAssigner) {
+ public KeyGroupStreamPartitioner(KeySelector<T, K> keySelector, int maxParallelism) {
+ Preconditions.checkArgument(maxParallelism > 0, "Number of key-groups must be > 0!");
this.keySelector = Preconditions.checkNotNull(keySelector);
- this.keyGroupAssigner = Preconditions.checkNotNull(keyGroupAssigner);
+ this.maxParallelism = maxParallelism;
}
- public KeyGroupAssigner<K> getKeyGroupAssigner() {
- return keyGroupAssigner;
+ public int getMaxParallelism() {
+ return maxParallelism;
}
@Override
@@ -61,10 +60,7 @@ public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implem
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
}
- returnArray[0] = KeyGroupRange.computeOperatorIndexForKeyGroup(
- keyGroupAssigner.getNumberKeyGroups(),
- numberOfOutputChannels,
- keyGroupAssigner.getKeyGroupIndex(key));
+ returnArray[0] = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfOutputChannels);
return returnArray;
}
@@ -80,6 +76,6 @@ public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implem
@Override
public void configure(int maxParallelism) {
- keyGroupAssigner.setup(maxParallelism);
+ this.maxParallelism = maxParallelism;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 13f650c..701281b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -762,7 +761,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
public <K> KeyedStateBackend<K> createKeyedStateBackend(
TypeSerializer<K> keySerializer,
- KeyGroupAssigner<K> keyGroupAssigner,
+ int numberOfKeyGroups,
KeyGroupRange keyGroupRange) throws Exception {
if (keyedStateBackend != null) {
@@ -779,7 +778,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
getEnvironment().getJobID(),
operatorIdentifier,
keySerializer,
- keyGroupAssigner,
+ numberOfKeyGroups,
keyGroupRange,
lazyRestoreKeyGroupStates,
getEnvironment().getTaskKvStateRegistry());
@@ -791,7 +790,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
getEnvironment().getJobID(),
operatorIdentifier,
keySerializer,
- keyGroupAssigner,
+ numberOfKeyGroups,
keyGroupRange,
getEnvironment().getTaskKvStateRegistry());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6d430618/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 06d381f..874274f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -19,11 +19,9 @@
package org.apache.flink.streaming.api.graph;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.KeyGroupAssigner;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.state.HashKeyGroupAssigner;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -270,10 +268,6 @@ public class StreamGraphGeneratorTest {
StreamNode keyedResultNode = graph.getStreamNode(keyedResult.getId());
StreamPartitioner<?> streamPartitioner = keyedResultNode.getInEdges().get(0).getPartitioner();
-
- HashKeyGroupAssigner<?> hashKeyGroupAssigner = extractHashKeyGroupAssigner(streamPartitioner);
-
- assertEquals(maxParallelism, hashKeyGroupAssigner.getNumberKeyGroups());
}
/**
@@ -384,7 +378,7 @@ public class StreamGraphGeneratorTest {
}
/**
- * Tests that the max parallelism and the key group partitioner is properly set for connected
+ * Tests that the max parallelism is properly set for connected
* streams.
*/
@Test
@@ -423,24 +417,6 @@ public class StreamGraphGeneratorTest {
StreamPartitioner<?> streamPartitioner1 = keyedResultNode.getInEdges().get(0).getPartitioner();
StreamPartitioner<?> streamPartitioner2 = keyedResultNode.getInEdges().get(1).getPartitioner();
-
- HashKeyGroupAssigner<?> hashKeyGroupAssigner1 = extractHashKeyGroupAssigner(streamPartitioner1);
- assertEquals(maxParallelism, hashKeyGroupAssigner1.getNumberKeyGroups());
-
- HashKeyGroupAssigner<?> hashKeyGroupAssigner2 = extractHashKeyGroupAssigner(streamPartitioner2);
- assertEquals(maxParallelism, hashKeyGroupAssigner2.getNumberKeyGroups());
- }
-
- private HashKeyGroupAssigner<?> extractHashKeyGroupAssigner(StreamPartitioner<?> streamPartitioner) {
- assertTrue(streamPartitioner instanceof KeyGroupStreamPartitioner);
-
- KeyGroupStreamPartitioner<?, ?> keyGroupStreamPartitioner = (KeyGroupStreamPartitioner<?, ?>) streamPartitioner;
-
- KeyGroupAssigner<?> keyGroupAssigner = keyGroupStreamPartitioner.getKeyGroupAssigner();
-
- assertTrue(keyGroupAssigner instanceof HashKeyGroupAssigner);
-
- return (HashKeyGroupAssigner<?>) keyGroupAssigner;
}
private static class OutputTypeConfigurableOperationWithTwoInputs