You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/08/31 17:28:21 UTC
[03/27] flink git commit: [FLINK-4380] Introduce KeyGroupAssigner and
Max-Parallelism Parameter
[FLINK-4380] Introduce KeyGroupAssigner and Max-Parallelism Parameter
This introduces a new KeySelector that assigns keys to key groups and
also adds the max parallelism parameter throughout all API levels.
This also adds tests for the newly introduced features.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec975aab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec975aab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec975aab
Branch: refs/heads/master
Commit: ec975aaba79449bd93020f296b05ea509ea57bdc
Parents: 7cd9bb5
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Jul 28 15:08:24 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Aug 31 19:04:31 2016 +0200
----------------------------------------------------------------------
docs/dev/api_concepts.md | 2 +
.../flink-statebackend-rocksdb/pom.xml | 7 +
.../src/test/resources/log4j-test.properties | 2 +-
.../src/test/resources/log4j.properties | 2 +-
.../flink/api/common/ExecutionConfig.java | 51 +-
.../api/common/state/KeyGroupAssigner.java | 47 ++
.../flink/api/common/state/StateDescriptor.java | 1 +
.../checkpoint/CheckpointCoordinator.java | 6 +-
.../InputChannelDeploymentDescriptor.java | 1 +
.../InputGateDeploymentDescriptor.java | 1 +
.../deployment/ResultPartitionLocation.java | 1 +
.../runtime/executiongraph/ExecutionGraph.java | 2 -
.../executiongraph/ExecutionJobVertex.java | 35 +-
.../runtime/executiongraph/ExecutionVertex.java | 4 +
.../flink/runtime/jobgraph/JobVertex.java | 30 +-
.../runtime/state/HashKeyGroupAssigner.java | 66 ++
.../flink/runtime/jobmanager/JobManager.scala | 1 -
.../checkpoint/CheckpointCoordinatorTest.java | 17 -
.../checkpoint/CheckpointStateRestoreTest.java | 5 +-
...ExecutionGraphCheckpointCoordinatorTest.java | 1 -
.../scheduler/SchedulerTestUtils.java | 2 +
.../testutils/DummyEnvironment.java.orig | 185 -----
.../streaming/api/datastream/KeyedStream.java | 12 +-
.../datastream/SingleOutputStreamOperator.java | 19 +
.../environment/StreamExecutionEnvironment.java | 42 +-
.../flink/streaming/api/graph/StreamConfig.java | 49 +-
.../flink/streaming/api/graph/StreamGraph.java | 33 +-
.../api/graph/StreamGraphGenerator.java | 43 +-
.../flink/streaming/api/graph/StreamNode.java | 25 +
.../api/graph/StreamingJobGraphGenerator.java | 31 +-
.../transformations/StreamTransformation.java | 24 +
.../ConfigurableStreamPartitioner.java | 39 ++
.../runtime/partitioner/HashPartitioner.java | 66 --
.../partitioner/KeyGroupStreamPartitioner.java | 82 +++
.../streaming/api/AggregationFunctionTest.java | 8 +-
.../flink/streaming/api/DataStreamTest.java | 8 +-
.../streaming/api/RestartStrategyTest.java | 4 +-
.../streaming/api/graph/SlotAllocationTest.java | 3 +-
.../api/graph/StreamGraphGeneratorTest.java | 220 ++++++
.../graph/StreamingJobGraphGeneratorTest.java | 216 +++++-
.../StreamingJobGraphGeneratorNodeHashTest.java | 64 +-
.../windowing/AllWindowTranslationTest.java | 4 +
.../windowing/WindowTranslationTest.java | 10 +
.../partitioner/HashPartitionerTest.java | 71 --
.../KeyGroupStreamPartitionerTest.java | 74 ++
.../partitioner/RescalePartitionerTest.java | 4 +
.../tasks/OneInputStreamTaskTestHarness.java | 2 +
.../streaming/runtime/tasks/StreamTaskTest.java | 176 +++--
.../runtime/tasks/StreamTaskTestHarness.java | 37 +-
.../src/test/resources/log4j-test.properties | 2 +-
.../flink/streaming/api/scala/DataStream.scala | 11 +
.../api/scala/StreamExecutionEnvironment.scala | 18 +
.../streaming/api/scala/DataStreamTest.scala | 2 +-
.../EventTimeAllWindowCheckpointingITCase.java | 2 +-
.../test/checkpointing/RescalingITCase.java | 683 +++++++++++++++++++
.../streaming/api/StreamingOperatorsITCase.java | 17 +
.../streaming/runtime/DataStreamPojoITCase.java | 2 +
.../test/streaming/runtime/TimestampITCase.java | 1 +
.../flink/test/web/WebFrontendITCase.java | 3 +-
59 files changed, 2074 insertions(+), 502 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/docs/dev/api_concepts.md
----------------------------------------------------------------------
diff --git a/docs/dev/api_concepts.md b/docs/dev/api_concepts.md
index 468085a..b0c034c 100644
--- a/docs/dev/api_concepts.md
+++ b/docs/dev/api_concepts.md
@@ -960,6 +960,8 @@ With the closure cleaner disabled, it might happen that an anonymous user functi
- `getParallelism()` / `setParallelism(int parallelism)` Set the default parallelism for the job.
+- `getMaxParallelism()` / `setMaxParallelism(int parallelism)` Set the default maximum parallelism for the job. This setting determines the maximum degree of parallelism and specifies the upper limit for dynamic scaling.
+
- `getNumberOfExecutionRetries()` / `setNumberOfExecutionRetries(int numberOfExecutionRetries)` Sets the number of times that failed tasks are re-executed. A value of zero effectively disables fault tolerance. A value of `-1` indicates that the system default value (as defined in the configuration) should be used.
- `getExecutionRetryDelay()` / `setExecutionRetryDelay(long executionRetryDelay)` Sets the delay in milliseconds that the system waits after a job has failed, before re-executing it. The delay starts after all tasks have been successfully been stopped on the TaskManagers, and once the delay is past, the tasks are re-started. This parameter is useful to delay re-execution in order to let certain time-out related failures surface fully (like broken connections that have not fully timed out), before attempting a re-execution and immediately failing again due to the same problem. This parameter only has an effect if the number of execution re-tries is one or more.
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-contrib/flink-statebackend-rocksdb/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/pom.xml b/flink-contrib/flink-statebackend-rocksdb/pom.xml
index efd673c..314b83f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/pom.xml
+++ b/flink-contrib/flink-statebackend-rocksdb/pom.xml
@@ -69,6 +69,13 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
index 0b686e5..881dc06 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
@@ -24,4 +24,4 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties
index ed2bbcb..4a30c6f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/resources/log4j.properties
@@ -24,4 +24,4 @@ log4j.rootLogger=OFF, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target = System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 5b69794..81ee930 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -74,6 +74,13 @@ public class ExecutionConfig implements Serializable {
*/
public static final int PARALLELISM_DEFAULT = -1;
+ /**
+ * The flag value indicating an unknown or unset parallelism. This value is
+ * not a valid parallelism and indicates that the parallelism should remain
+ * unchanged.
+ */
+ public static final int PARALLELISM_UNKNOWN = -2;
+
private static final long DEFAULT_RESTART_DELAY = 10000L;
// --------------------------------------------------------------------------------------------
@@ -86,6 +93,13 @@ public class ExecutionConfig implements Serializable {
private int parallelism = PARALLELISM_DEFAULT;
/**
+ * The program wide maximum parallelism used for operators which haven't specified a maximum
+ * parallelism. The maximum parallelism specifies the upper limit for dynamic scaling and the
+ * number of key groups used for partitioned state.
+ */
+ private int maxParallelism = -1;
+
+ /**
* @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
*/
@Deprecated
@@ -219,12 +233,41 @@ public class ExecutionConfig implements Serializable {
* @param parallelism The parallelism to use
*/
public ExecutionConfig setParallelism(int parallelism) {
- Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
- "The parallelism of an operator must be at least 1.");
+ if (parallelism != PARALLELISM_UNKNOWN) {
+ if (parallelism < 1 && parallelism != PARALLELISM_DEFAULT) {
+ throw new IllegalArgumentException(
+ "Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
+ }
+ this.parallelism = parallelism;
+ }
+ return this;
+ }
- this.parallelism = parallelism;
+ /**
+ * Gets the maximum degree of parallelism defined for the program.
+ *
+ * The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
+ * defines the number of key groups used for partitioned state.
+ *
+ * @return Maximum degree of parallelism
+ */
+ @PublicEvolving
+ public int getMaxParallelism() {
+ return maxParallelism;
+ }
- return this;
+ /**
+ * Sets the maximum degree of parallelism defined for the program.
+ *
+ * The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
+ * defines the number of key groups used for partitioned state.
+ *
+ * @param maxParallelism Maximum degree of parallelism to be used for the program.
+ */
+ @PublicEvolving
+ public void setMaxParallelism(int maxParallelism) {
+ Preconditions.checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0.");
+ this.maxParallelism = maxParallelism;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java
new file mode 100644
index 0000000..23463e1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.Serializable;
+
+/**
+ * Assigns a key to a key group index. A key group is the smallest unit of partitioned state
+ * which is assigned to an operator. An operator can be assigned multiple key groups.
+ *
+ * @param <K> Type of the key
+ */
+@Internal
+public interface KeyGroupAssigner<K> extends Serializable {
+ /**
+ * Calculates the key group index for the given key.
+ *
+ * @param key Key to be used
+ * @return Key group index for the given key
+ */
+ int getKeyGroupIndex(K key);
+
+ /**
+ * Setups the key group assigner with the maximum parallelism (= number of key groups).
+ *
+ * @param maxParallelism Maximum parallelism (= number of key groups)
+ */
+ void setup(int maxParallelism);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index d99f4de..483c954 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -44,6 +44,7 @@ import static java.util.Objects.requireNonNull;
* <p>Subclasses must correctly implement {@link #equals(Object)} and {@link #hashCode()}.
*
* @param <S> The type of the State objects created from this {@code StateDescriptor}.
+ * @param <T> The type of the value of the state object described by this state descriptor.
*/
@PublicEvolving
public abstract class StateDescriptor<S extends State, T> implements Serializable {
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 3619f48..24fca5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -155,8 +155,6 @@ public class CheckpointCoordinator {
/** Helper for tracking checkpoint statistics */
private final CheckpointStatsTracker statsTracker;
- private final int numberKeyGroups;
-
// --------------------------------------------------------------------------------------------
public CheckpointCoordinator(
@@ -165,7 +163,6 @@ public class CheckpointCoordinator {
long checkpointTimeout,
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpointAttempts,
- int numberKeyGroups,
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
@@ -202,7 +199,6 @@ public class CheckpointCoordinator {
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.userClassLoader = checkNotNull(userClassLoader);
this.statsTracker = checkNotNull(statsTracker);
- this.numberKeyGroups = numberKeyGroups;
this.timer = new Timer("Checkpoint Timer", true);
@@ -797,7 +793,7 @@ public class CheckpointCoordinator {
int counter = 0;
- List<Set<Integer>> keyGroupPartitions = createKeyGroupPartitions(numberKeyGroups, executionJobVertex.getParallelism());
+ List<Set<Integer>> keyGroupPartitions = createKeyGroupPartitions(executionJobVertex.getMaxParallelism(), executionJobVertex.getParallelism());
for (int i = 0; i < executionJobVertex.getParallelism(); i++) {
SubtaskState subtaskState = taskState.getState(i);
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index e00a480..f31febb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -48,6 +48,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class InputChannelDeploymentDescriptor implements Serializable {
+ private static final long serialVersionUID = 373711381640454080L;
private static Logger LOG = LoggerFactory.getLogger(InputChannelDeploymentDescriptor.class);
/** The ID of the partition the input channel is going to consume. */
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
index ec4bd40..dde1ed7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
@@ -38,6 +38,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class InputGateDeploymentDescriptor implements Serializable {
+ private static final long serialVersionUID = -7143441863165366704L;
/**
* The ID of the consumed intermediate result. Each input gate consumes partitions of the
* intermediate result specified by this ID. This ID also identifies the input gate at the
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionLocation.java
index ca63e6b..895bea0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionLocation.java
@@ -47,6 +47,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class ResultPartitionLocation implements Serializable {
+ private static final long serialVersionUID = -6354238166937194463L;
/** The type of location for the result partition. */
private final LocationType locationType;
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index d7e40a3..92cab41 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -340,7 +340,6 @@ public class ExecutionGraph {
long checkpointTimeout,
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpoints,
- int numberKeyGroups,
List<ExecutionJobVertex> verticesToTrigger,
List<ExecutionJobVertex> verticesToWaitFor,
List<ExecutionJobVertex> verticesToCommitTo,
@@ -373,7 +372,6 @@ public class ExecutionGraph {
checkpointTimeout,
minPauseBetweenCheckpoints,
maxConcurrentCheckpoints,
- numberKeyGroups,
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 6272151..d3dc8fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -40,7 +40,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.util.SerializableObject;
-
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import scala.concurrent.duration.FiniteDuration;
@@ -69,6 +69,8 @@ public class ExecutionJobVertex {
private final List<IntermediateResult> inputs;
private final int parallelism;
+
+ private final int maxParallelism;
private final boolean[] finishedSubtasks;
@@ -81,16 +83,23 @@ public class ExecutionJobVertex {
private final InputSplit[] inputSplits;
private InputSplitAssigner splitAssigner;
+
+ public ExecutionJobVertex(
+ ExecutionGraph graph,
+ JobVertex jobVertex,
+ int defaultParallelism,
+ FiniteDuration timeout) throws JobException {
- public ExecutionJobVertex(ExecutionGraph graph, JobVertex jobVertex,
- int defaultParallelism, FiniteDuration timeout) throws JobException {
this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis());
}
- public ExecutionJobVertex(ExecutionGraph graph, JobVertex jobVertex,
- int defaultParallelism, FiniteDuration timeout, long createTimestamp)
- throws JobException
- {
+ public ExecutionJobVertex(
+ ExecutionGraph graph,
+ JobVertex jobVertex,
+ int defaultParallelism,
+ FiniteDuration timeout,
+ long createTimestamp) throws JobException {
+
if (graph == null || jobVertex == null) {
throw new NullPointerException();
}
@@ -102,6 +111,14 @@ public class ExecutionJobVertex {
int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;
this.parallelism = numTaskVertices;
+
+ int maxParallelism = jobVertex.getMaxParallelism();
+
+ Preconditions.checkArgument(maxParallelism >= parallelism, "The maximum parallelism (" +
+ maxParallelism + ") must be greater or equal than the parallelism (" + parallelism +
+ ").");
+ this.maxParallelism = maxParallelism;
+
this.taskVertices = new ExecutionVertex[numTaskVertices];
this.inputs = new ArrayList<IntermediateResult>(jobVertex.getInputs().size());
@@ -177,6 +194,10 @@ public class ExecutionJobVertex {
return parallelism;
}
+ public int getMaxParallelism() {
+ return maxParallelism;
+ }
+
public JobID getJobId() {
return graph.getJobID();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 2495316..b15f851 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -187,6 +187,10 @@ public class ExecutionVertex {
return this.subTaskIndex;
}
+ public int getMaxParallelism() {
+ return this.jobVertex.getMaxParallelism();
+ }
+
public int getNumberOfInputs() {
return this.inputEdges.length;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 379a42a..4786388 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -18,9 +18,6 @@
package org.apache.flink.runtime.jobgraph;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitSource;
@@ -31,6 +28,9 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.util.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* The base class for job vertexes.
*/
@@ -57,6 +57,9 @@ public class JobVertex implements java.io.Serializable {
/** Number of subtasks to split this task into at runtime.*/
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
+ /** Maximum number of subtasks to split this taks into a runtime. */
+ private int maxParallelism = Short.MAX_VALUE;
+
/** Custom configuration passed to the assigned task at runtime. */
private Configuration configuration;
@@ -234,6 +237,27 @@ public class JobVertex implements java.io.Serializable {
this.parallelism = parallelism;
}
+ /**
+ * Gets the maximum parallelism for the task.
+ *
+ * @return The maximum parallelism for the task.
+ */
+ public int getMaxParallelism() {
+ return maxParallelism;
+ }
+
+ /**
+ * Sets the maximum parallelism for the task.
+ *
+ * @param maxParallelism The maximum parallelism to be set.
+ */
+ public void setMaxParallelism(int maxParallelism) {
+ org.apache.flink.util.Preconditions.checkArgument(
+ maxParallelism > 0 && maxParallelism <= Short.MAX_VALUE, "The max parallelism must be at least 1.");
+
+ this.maxParallelism = maxParallelism;
+ }
+
public InputSplitSource<?> getInputSplitSource() {
return inputSplitSource;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
new file mode 100644
index 0000000..280746d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.KeyGroupAssigner;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Hash based key group assigner. The assigner assigns each key to a key group using the hash value
+ * of the key.
+ *
+ * @param <K> Type of the key
+ */
+public class HashKeyGroupAssigner<K> implements KeyGroupAssigner<K> {
+ private static final long serialVersionUID = -6319826921798945448L;
+
+ private static final int UNDEFINED_NUMBER_KEY_GROUPS = Integer.MIN_VALUE;
+
+ private int numberKeyGroups;
+
+ public HashKeyGroupAssigner() {
+ this(UNDEFINED_NUMBER_KEY_GROUPS);
+ }
+
+ public HashKeyGroupAssigner(int numberKeyGroups) {
+ Preconditions.checkArgument(numberKeyGroups > 0 || numberKeyGroups == UNDEFINED_NUMBER_KEY_GROUPS,
+ "The number of key groups has to be greater than 0 or undefined. Use " +
+ "setMaxParallelism() to specify the number of key groups.");
+ this.numberKeyGroups = numberKeyGroups;
+ }
+
+ public int getNumberKeyGroups() {
+ return numberKeyGroups;
+ }
+
+ @Override
+ public int getKeyGroupIndex(K key) {
+ return MathUtils.murmurHash(key.hashCode()) % numberKeyGroups;
+ }
+
+ @Override
+ public void setup(int numberKeyGroups) {
+ Preconditions.checkArgument(numberKeyGroups > 0, "The number of key groups has to be " +
+ "greater than 0. Use setMaxParallelism() to specify the number of key " +
+ "groups.");
+
+ this.numberKeyGroups = numberKeyGroups;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index b706a1a..356f1a9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1271,7 +1271,6 @@ class JobManager(
snapshotSettings.getCheckpointTimeout,
snapshotSettings.getMinPauseBetweenCheckpoints,
snapshotSettings.getMaxConcurrentCheckpoints,
- parallelism,
triggerVertices,
ackVertices,
confirmVertices,
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 09c53d6..50330fa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -86,7 +86,6 @@ public class CheckpointCoordinatorTest {
600000,
600000,
0, Integer.MAX_VALUE,
- 42,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] {},
@@ -140,7 +139,6 @@ public class CheckpointCoordinatorTest {
600000,
0,
Integer.MAX_VALUE,
- 42,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] {},
@@ -192,7 +190,6 @@ public class CheckpointCoordinatorTest {
600000,
0,
Integer.MAX_VALUE,
- 42,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] {},
@@ -245,7 +242,6 @@ public class CheckpointCoordinatorTest {
600000,
0,
Integer.MAX_VALUE,
- 42,
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
@@ -371,7 +367,6 @@ public class CheckpointCoordinatorTest {
600000,
0,
Integer.MAX_VALUE,
- 42,
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
@@ -493,7 +488,6 @@ public class CheckpointCoordinatorTest {
600000,
0,
Integer.MAX_VALUE,
- 42,
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
@@ -645,7 +639,6 @@ public class CheckpointCoordinatorTest {
600000,
0,
Integer.MAX_VALUE,
- 42,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
new ExecutionVertex[] { commitVertex },
@@ -782,7 +775,6 @@ public class CheckpointCoordinatorTest {
600000,
0,
Integer.MAX_VALUE,
- 42,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
new ExecutionVertex[] { commitVertex },
@@ -905,7 +897,6 @@ public class CheckpointCoordinatorTest {
200,
0,
Integer.MAX_VALUE,
- 42,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] { commitVertex },
@@ -975,7 +966,6 @@ public class CheckpointCoordinatorTest {
200000,
0,
Integer.MAX_VALUE,
- 42,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] { commitVertex },
@@ -1056,7 +1046,6 @@ public class CheckpointCoordinatorTest {
200000, // timeout is very long (200 s)
0,
Integer.MAX_VALUE,
- 42,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex },
new ExecutionVertex[] { commitVertex },
@@ -1149,7 +1138,6 @@ public class CheckpointCoordinatorTest {
200000, // timeout is very long (200 s)
500, // 500ms delay between checkpoints
10,
- 42,
new ExecutionVertex[] { vertex1 },
new ExecutionVertex[] { vertex1 },
new ExecutionVertex[] { vertex1 },
@@ -1235,7 +1223,6 @@ public class CheckpointCoordinatorTest {
600000,
0,
Integer.MAX_VALUE,
- 42,
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
@@ -1374,7 +1361,6 @@ public class CheckpointCoordinatorTest {
600000,
0,
Integer.MAX_VALUE,
- 42,
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
@@ -1462,7 +1448,6 @@ public class CheckpointCoordinatorTest {
200000, // timeout is very long (200 s)
0L, // no extra delay
maxConcurrentAttempts,
- 42,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex },
new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
@@ -1534,7 +1519,6 @@ public class CheckpointCoordinatorTest {
200000, // timeout is very long (200 s)
0L, // no extra delay
maxConcurrentAttempts, // max two concurrent checkpoints
- 42,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex },
new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
@@ -1615,7 +1599,6 @@ public class CheckpointCoordinatorTest {
200000, // timeout is very long (200 s)
0L, // no extra delay
2, // max two concurrent checkpoints
- 42,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex },
new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter(),
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 061059a..1816fc9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -90,7 +90,6 @@ public class CheckpointStateRestoreTest {
200000L,
0,
Integer.MAX_VALUE,
- 42,
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[0],
@@ -170,7 +169,6 @@ public class CheckpointStateRestoreTest {
200000L,
0,
Integer.MAX_VALUE,
- 42,
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[0],
@@ -221,7 +219,6 @@ public class CheckpointStateRestoreTest {
200000L,
0,
Integer.MAX_VALUE,
- 42,
new ExecutionVertex[] { mock(ExecutionVertex.class) },
new ExecutionVertex[] { mock(ExecutionVertex.class) },
new ExecutionVertex[0], cl,
@@ -263,12 +260,14 @@ public class CheckpointStateRestoreTest {
when(mock.getParallelSubtaskIndex()).thenReturn(subtask);
when(mock.getCurrentExecutionAttempt()).thenReturn(execution);
when(mock.getTotalNumberOfParallelSubtasks()).thenReturn(parallelism);
+ when(mock.getMaxParallelism()).thenReturn(parallelism);
return mock;
}
private ExecutionJobVertex mockExecutionJobVertex(JobVertexID id, ExecutionVertex[] vertices) {
ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
when(vertex.getParallelism()).thenReturn(vertices.length);
+ when(vertex.getMaxParallelism()).thenReturn(vertices.length);
when(vertex.getJobVertexId()).thenReturn(id);
when(vertex.getTaskVertices()).thenReturn(vertices);
return vertex;
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 49a9449..1de7098 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -112,7 +112,6 @@ public class ExecutionGraphCheckpointCoordinatorTest {
100,
100,
1,
- 42,
Collections.<ExecutionJobVertex>emptyList(),
Collections.<ExecutionJobVertex>emptyList(),
Collections.<ExecutionJobVertex>emptyList(),
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index ed6d1ee..983d6e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -104,6 +104,7 @@ public class SchedulerTestUtils {
when(vertex.getJobvertexId()).thenReturn(jid);
when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);
when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(numTasks);
+ when(vertex.getMaxParallelism()).thenReturn(numTasks);
when(vertex.toString()).thenReturn("TEST-VERTEX");
when(vertex.getSimpleName()).thenReturn("TEST-VERTEX");
@@ -121,6 +122,7 @@ public class SchedulerTestUtils {
when(vertex.getJobvertexId()).thenReturn(jid);
when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);
when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(numTasks);
+ when(vertex.getMaxParallelism()).thenReturn(numTasks);
when(vertex.toString()).thenReturn("TEST-VERTEX");
Execution execution = mock(Execution.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java.orig
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java.orig b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java.orig
deleted file mode 100644
index 393ee4c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java.orig
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.testutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.TaskInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.Future;
-
-public class DummyEnvironment implements Environment {
-
- private final JobID jobId = new JobID();
- private final JobVertexID jobVertexId = new JobVertexID();
- private final ExecutionAttemptID executionId = new ExecutionAttemptID();
- private final ExecutionConfig executionConfig = new ExecutionConfig();
-<<<<<<< 9a73dbc71b83080b7deccc62b8b6ffa9f102e847
- private final TaskInfo taskInfo;
-=======
- private final KvStateRegistry kvStateRegistry = new KvStateRegistry();
- private final TaskKvStateRegistry taskKvStateRegistry;
->>>>>>> [FLINK-3779] [runtime] Add KvStateRegistry for queryable KvState
-
- public DummyEnvironment(String taskName, int numSubTasks, int subTaskIndex) {
- this.taskInfo = new TaskInfo(taskName, subTaskIndex, numSubTasks, 0);
-
- this.taskKvStateRegistry = kvStateRegistry.createTaskRegistry(jobId, jobVertexId);
- }
-
- public KvStateRegistry getKvStateRegistry() {
- return kvStateRegistry;
- }
-
- @Override
- public ExecutionConfig getExecutionConfig() {
- return executionConfig;
- }
-
- @Override
- public JobID getJobID() {
- return jobId;
- }
-
- @Override
- public JobVertexID getJobVertexId() {
- return jobVertexId;
- }
-
- @Override
- public ExecutionAttemptID getExecutionId() {
- return executionId;
- }
-
- @Override
- public Configuration getTaskConfiguration() {
- return new Configuration();
- }
-
- @Override
- public TaskManagerRuntimeInfo getTaskManagerInfo() {
- return null;
- }
-
- @Override
- public TaskMetricGroup getMetricGroup() {
- return new UnregisteredTaskMetricsGroup();
- }
-
- @Override
- public Configuration getJobConfiguration() {
- return new Configuration();
- }
-
- @Override
- public TaskInfo getTaskInfo() {
- return taskInfo;
- }
-
- @Override
- public InputSplitProvider getInputSplitProvider() {
- return null;
- }
-
- @Override
- public IOManager getIOManager() {
- return null;
- }
-
- @Override
- public MemoryManager getMemoryManager() {
- return null;
- }
-
- @Override
- public ClassLoader getUserClassLoader() {
- return getClass().getClassLoader();
- }
-
- @Override
- public Map<String, Future<Path>> getDistributedCacheEntries() {
- return Collections.emptyMap();
- }
-
- @Override
- public BroadcastVariableManager getBroadcastVariableManager() {
- return null;
- }
-
- @Override
- public AccumulatorRegistry getAccumulatorRegistry() {
- return null;
- }
-
- @Override
-<<<<<<< 9a73dbc71b83080b7deccc62b8b6ffa9f102e847
- public void acknowledgeCheckpoint(long checkpointId) {}
-=======
- public TaskKvStateRegistry getTaskKvStateRegistry() {
- return taskKvStateRegistry;
- }
-
- @Override
- public void acknowledgeCheckpoint(long checkpointId) {
- }
->>>>>>> [FLINK-3779] [runtime] Add KvStateRegistry for queryable KvState
-
- @Override
- public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {}
-
- @Override
- public ResultPartitionWriter getWriter(int index) {
- return null;
- }
-
- @Override
- public ResultPartitionWriter[] getAllWriters() {
- return null;
- }
-
- @Override
- public InputGate getInputGate(int index) {
- return null;
- }
-
- @Override
- public InputGate[] getAllInputGates() {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 6998890..1fb34b8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.runtime.state.HashKeyGroupAssigner;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
@@ -55,7 +56,7 @@ import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import java.util.UUID;
@@ -105,8 +106,13 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
* Function for determining state partitions
*/
public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
- super(dataStream.getExecutionEnvironment(), new PartitionTransformation<>(
- dataStream.getTransformation(), new HashPartitioner<>(keySelector)));
+ super(
+ dataStream.getExecutionEnvironment(),
+ new PartitionTransformation<>(
+ dataStream.getTransformation(),
+ new KeyGroupStreamPartitioner<>(
+ keySelector,
+ new HashKeyGroupAssigner<KEY>())));
this.keySelector = keySelector;
this.keyType = keyType;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 02ea219..614f19b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.util.Preconditions;
import static java.util.Objects.requireNonNull;
@@ -106,6 +107,24 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
}
/**
+ * Sets the maximum parallelism of this operator.
+ *
+ * The maximum parallelism specifies the upper bound for dynamic scaling. It also defines the
+ * number of key groups used for partitioned state.
+ *
+ * @param maxParallelism Maximum parallelism
+ * @return The operator with set maximum parallelism
+ */
+ @PublicEvolving
+ public SingleOutputStreamOperator<T> setMaxParallelism(int maxParallelism) {
+ Preconditions.checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0.");
+
+ transformation.setMaxParallelism(maxParallelism);
+
+ return this;
+ }
+
+ /**
* Sets the parallelism of this operator to one.
* And mark this operator cannot set a non-1 degree of parallelism.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ead9564..78aab97 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -18,16 +18,16 @@
package org.apache.flink.streaming.api.environment;
import com.esotericsoftware.kryo.Serializer;
-
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
@@ -46,22 +46,22 @@ import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.PreviewPlanEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
-import org.apache.flink.api.common.io.FilePathFilter;
-import org.apache.flink.streaming.api.functions.source.FileReadFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
-import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
+import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.streaming.api.functions.source.FileReadFunction;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
import org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction;
+import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
@@ -69,7 +69,6 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.StoppableStreamSource;
import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SplittableIterator;
@@ -168,6 +167,21 @@ public abstract class StreamExecutionEnvironment {
}
/**
+ * Sets the maximum degree of parallelism defined for the program.
+ *
+ * The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
+ * defines the number of key groups used for partitioned state.
+ *
+ * @param maxParallelism Maximum degree of parallelism to be used for the program., with 0 < maxParallelism <= 2^15
+ */
+ public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {
+ Preconditions.checkArgument(maxParallelism > 0 && maxParallelism <= (1 << 15),
+ "maxParallelism is out of bounds 0 < maxParallelism <= 2^15. Found: " + maxParallelism);
+ config.setMaxParallelism(maxParallelism);
+ return this;
+ }
+
+ /**
* Gets the parallelism with which operation are executed by default.
* Operations can individually override this value to use a specific
* parallelism.
@@ -180,6 +194,18 @@ public abstract class StreamExecutionEnvironment {
}
/**
+ * Gets the maximum degree of parallelism defined for the program.
+ *
+ * The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
+ * defines the number of key groups used for partitioned state.
+ *
+ * @return Maximum degree of parallelism
+ */
+ public int getMaxParallelism() {
+ return config.getMaxParallelism();
+ }
+
+ /**
* Sets the maximum time frequency (milliseconds) for the flushing of the
* output buffers. By default the output buffers flush frequently to provide
* low latency and to aid smooth developer experience. Setting the parameter
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 783b3e2..1a92ba4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyGroupAssigner;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
@@ -74,6 +75,10 @@ public class StreamConfig implements Serializable {
private static final String STATE_BACKEND = "statebackend";
private static final String STATE_PARTITIONER = "statePartitioner";
+
+ /** key for the {@link KeyGroupAssigner} for key to key group index mappings */
+ private static final String KEY_GROUP_ASSIGNER = "keyGroupAssigner";
+
private static final String STATE_KEY_SERIALIZER = "statekeyser";
private static final String TIME_CHARACTERISTIC = "timechar";
@@ -402,10 +407,12 @@ public class StreamConfig implements Serializable {
// ------------------------------------------------------------------------
public void setStateBackend(AbstractStateBackend backend) {
- try {
- InstantiationUtil.writeObjectToConfig(backend, this.config, STATE_BACKEND);
- } catch (Exception e) {
- throw new StreamTaskException("Could not serialize stateHandle provider.", e);
+ if (backend != null) {
+ try {
+ InstantiationUtil.writeObjectToConfig(backend, this.config, STATE_BACKEND);
+ } catch (Exception e) {
+ throw new StreamTaskException("Could not serialize stateHandle provider.", e);
+ }
}
}
@@ -416,6 +423,10 @@ public class StreamConfig implements Serializable {
throw new StreamTaskException("Could not instantiate statehandle provider.", e);
}
}
+
+ public byte[] getSerializedStateBackend() {
+ return this.config.getBytes(STATE_BACKEND, null);
+ }
public void setStatePartitioner(int input, KeySelector<?, ?> partitioner) {
try {
@@ -432,6 +443,34 @@ public class StreamConfig implements Serializable {
throw new StreamTaskException("Could not instantiate state partitioner.", e);
}
}
+
+ /**
+ * Sets the {@link KeyGroupAssigner} to be used for the current {@link StreamOperator}.
+ *
+ * @param keyGroupAssigner Key group assigner to be used
+ */
+ public void setKeyGroupAssigner(KeyGroupAssigner<?> keyGroupAssigner) {
+ try {
+ InstantiationUtil.writeObjectToConfig(keyGroupAssigner, this.config, KEY_GROUP_ASSIGNER);
+ } catch (Exception e) {
+ throw new StreamTaskException("Could not serialize virtual state partitioner.", e);
+ }
+ }
+
+ /**
+ * Gets the {@link KeyGroupAssigner} for the {@link StreamOperator}.
+ *
+ * @param classLoader Classloader to be used for the deserialization
+ * @param <K> Type of the keys to be assigned to key groups
+ * @return Key group assigner
+ */
+ public <K> KeyGroupAssigner<K> getKeyGroupAssigner(ClassLoader classLoader) {
+ try {
+ return InstantiationUtil.readObjectFromConfig(this.config, KEY_GROUP_ASSIGNER, classLoader);
+ } catch (Exception e) {
+ throw new StreamTaskException("Could not instantiate virtual state partitioner.", e);
+ }
+ }
public void setStateKeySerializer(TypeSerializer<?> serializer) {
try {
@@ -448,6 +487,8 @@ public class StreamConfig implements Serializable {
throw new StreamTaskException("Could not instantiate state key serializer from task config.", e);
}
}
+
+
// ------------------------------------------------------------------------
// Miscellansous
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 4be2874..c946e98 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -50,6 +50,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -357,6 +358,18 @@ public class StreamGraph extends StreamingPlan {
if (partitioner == null) {
partitioner = virtuaPartitionNodes.get(virtualId).f1;
}
+
+ if (partitioner instanceof ConfigurableStreamPartitioner) {
+ StreamNode downstreamNode = getStreamNode(downStreamVertexID);
+
+ ConfigurableStreamPartitioner configurableStreamPartitioner = (ConfigurableStreamPartitioner) partitioner;
+
+ // Configure the partitioner with the max parallelism. This is necessary if the
+ // partitioner has been created before the maximum parallelism has been set. The
+ // maximum parallelism is necessary for the key group mapping.
+ configurableStreamPartitioner.configure(downstreamNode.getMaxParallelism());
+ }
+
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
} else {
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
@@ -407,6 +420,12 @@ public class StreamGraph extends StreamingPlan {
}
}
+ public void setMaxParallelism(int vertexID, int maxParallelism) {
+ if (getStreamNode(vertexID) != null) {
+ getStreamNode(vertexID).setMaxParallelism(maxParallelism);
+ }
+ }
+
public void setOneInputStateKey(Integer vertexID, KeySelector<?, ?> keySelector, TypeSerializer<?> keySerializer) {
StreamNode node = getStreamNode(vertexID);
node.setStatePartitioner1(keySelector);
@@ -514,7 +533,13 @@ public class StreamGraph extends StreamingPlan {
return vertexIDtoLoopTimeout.get(vertexID);
}
- public Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(int loopId, int sourceId, int sinkId, long timeout, int parallelism) {
+ public Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(
+ int loopId,
+ int sourceId,
+ int sinkId,
+ long timeout,
+ int parallelism,
+ int maxParallelism) {
StreamNode source = this.addNode(sourceId,
null,
StreamIterationHead.class,
@@ -522,6 +547,7 @@ public class StreamGraph extends StreamingPlan {
"IterationSource-" + loopId);
sources.add(source.getId());
setParallelism(source.getId(), parallelism);
+ setMaxParallelism(source.getId(), maxParallelism);
StreamNode sink = this.addNode(sinkId,
null,
@@ -530,6 +556,7 @@ public class StreamGraph extends StreamingPlan {
"IterationSink-" + loopId);
sinks.add(sink.getId());
setParallelism(sink.getId(), parallelism);
+ setMaxParallelism(sink.getId(), parallelism);
iterationSourceSinkPairs.add(new Tuple2<>(source, sink));
@@ -604,8 +631,4 @@ public class StreamGraph extends StreamingPlan {
}
}
}
-
- public static enum ResourceStrategy {
- DEFAULT, ISOLATE, NEWGROUP
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index de80e25..81c5c48 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -144,6 +144,31 @@ public class StreamGraphGenerator {
LOG.debug("Transforming " + transform);
+ if (transform.getMaxParallelism() <= 0) {
+ // if the max parallelism hasn't been set, then first use the job wide max parallelism
+ // from theExecutionConfig. If this value has not been specified either, then use the
+ // parallelism of the operator.
+ int maxParallelism = env.getConfig().getMaxParallelism();
+
+ if (maxParallelism <= 0) {
+ maxParallelism = transform.getParallelism();
+
+ /**
+ * TODO: Remove once the parallelism settings works properly in Flink (FLINK-3885)
+ * Currently, the parallelism will be set to 1 on the JobManager iff it encounters
+ * a negative parallelism value. We need to know this for the
+ * KeyGroupStreamPartitioner on the client-side. Thus, we already set the value to
+ * 1 here.
+ */
+ if (maxParallelism <= 0) {
+ transform.setParallelism(1);
+ maxParallelism = 1;
+ }
+ }
+
+ transform.setMaxParallelism(maxParallelism);
+ }
+
// call at least once to trigger exceptions about MissingTypeInfo
transform.getOutputType();
@@ -309,11 +334,12 @@ public class StreamGraphGenerator {
// create the fake iteration source/sink pair
Tuple2<StreamNode, StreamNode> itSourceAndSink = streamGraph.createIterationSourceAndSink(
- iterate.getId(),
- getNewIterationNodeId(),
- getNewIterationNodeId(),
- iterate.getWaitTime(),
- iterate.getParallelism());
+ iterate.getId(),
+ getNewIterationNodeId(),
+ getNewIterationNodeId(),
+ iterate.getWaitTime(),
+ iterate.getParallelism(),
+ iterate.getMaxParallelism());
StreamNode itSource = itSourceAndSink.f0;
StreamNode itSink = itSourceAndSink.f1;
@@ -377,7 +403,8 @@ public class StreamGraphGenerator {
getNewIterationNodeId(),
getNewIterationNodeId(),
coIterate.getWaitTime(),
- coIterate.getParallelism());
+ coIterate.getParallelism(),
+ coIterate.getMaxParallelism());
StreamNode itSource = itSourceAndSink.f0;
StreamNode itSink = itSourceAndSink.f1;
@@ -430,6 +457,7 @@ public class StreamGraphGenerator {
streamGraph.setInputFormat(source.getId(), fs.getFormat());
}
streamGraph.setParallelism(source.getId(), source.getParallelism());
+ streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism());
return Collections.singleton(source.getId());
}
@@ -450,6 +478,7 @@ public class StreamGraphGenerator {
"Sink: " + sink.getName());
streamGraph.setParallelism(sink.getId(), sink.getParallelism());
+ streamGraph.setMaxParallelism(sink.getId(), sink.getMaxParallelism());
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId,
@@ -498,6 +527,7 @@ public class StreamGraphGenerator {
}
streamGraph.setParallelism(transform.getId(), transform.getParallelism());
+ streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
@@ -545,6 +575,7 @@ public class StreamGraphGenerator {
streamGraph.setParallelism(transform.getId(), transform.getParallelism());
+ streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
for (Integer inputId: inputIds1) {
streamGraph.addEdge(inputId,
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 430760b..9051891 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.util.Preconditions;
/**
* Class representing the operators in the streaming programs, with all their properties.
@@ -43,6 +44,11 @@ public class StreamNode implements Serializable {
private final int id;
private Integer parallelism = null;
+ /**
+ * Maximum parallelism for this stream node. The maximum parallelism is the upper limit for
+ * dynamic scaling and the number of key groups used for partitioned state.
+ */
+ private int maxParallelism;
private Long bufferTimeout = null;
private final String operatorName;
private String slotSharingGroup;
@@ -141,6 +147,25 @@ public class StreamNode implements Serializable {
this.parallelism = parallelism;
}
+ /**
+ * Get the maximum parallelism for this stream node.
+ *
+ * @return Maximum parallelism
+ */
+ int getMaxParallelism() {
+ return maxParallelism;
+ }
+
+ /**
+ * Set the maximum parallelism for this stream node.
+ *
+ * @param maxParallelism Maximum parallelism to be set
+ */
+ void setMaxParallelism(int maxParallelism) {
+ Preconditions.checkArgument(maxParallelism > 0, "The maximum parallelism must be at least 1.");
+ this.maxParallelism = maxParallelism;
+ }
+
public Long getBufferTimeout() {
return bufferTimeout != null ? bufferTimeout : env.getBufferTimeout();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 28d982c..76fdaca 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.KeyGroupAssigner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -40,6 +41,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.state.HashKeyGroupAssigner;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
@@ -289,6 +291,21 @@ public class StreamingJobGraphGenerator {
if (parallelism > 0) {
jobVertex.setParallelism(parallelism);
+ } else {
+ parallelism = jobVertex.getParallelism();
+ }
+
+ int maxParallelism = streamNode.getMaxParallelism();
+
+ // the maximum parallelism specifies the upper bound for the parallelism
+ if (parallelism > maxParallelism) {
+ // the parallelism should always be smaller or equal than the max parallelism
+ throw new IllegalStateException("The maximum parallelism (" + maxParallelism + ") of " +
+ "the stream node " + streamNode + " is smaller than the parallelism (" +
+ parallelism + "). Increase the maximum parallelism or decrease the parallelism of" +
+ "this operator.");
+ } else {
+ jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
}
if (LOG.isDebugEnabled()) {
@@ -325,7 +342,7 @@ public class StreamingJobGraphGenerator {
config.setTimeCharacteristic(streamGraph.getEnvironment().getStreamTimeCharacteristic());
final CheckpointConfig ceckpointCfg = streamGraph.getCheckpointConfig();
-
+
config.setStateBackend(streamGraph.getStateBackend());
config.setCheckpointingEnabled(ceckpointCfg.isCheckpointingEnabled());
if (ceckpointCfg.isCheckpointingEnabled()) {
@@ -339,7 +356,15 @@ public class StreamingJobGraphGenerator {
config.setStatePartitioner(0, vertex.getStatePartitioner1());
config.setStatePartitioner(1, vertex.getStatePartitioner2());
config.setStateKeySerializer(vertex.getStateKeySerializer());
-
+
+ // only set the key group assigner if the vertex uses partitioned state (= KeyedStream).
+ if (vertex.getStatePartitioner1() != null) {
+ // the key group assigner has to know the number of key groups (= maxParallelism)
+ KeyGroupAssigner<Object> keyGroupAssigner = new HashKeyGroupAssigner<Object>(vertex.getMaxParallelism());
+
+ config.setKeyGroupAssigner(keyGroupAssigner);
+ }
+
Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
if (vertexClass.equals(StreamIterationHead.class)
@@ -725,8 +750,6 @@ public class StreamingJobGraphGenerator {
// stream graph.
hasher.putInt(id);
- hasher.putInt(node.getParallelism());
-
if (node.getOperator() instanceof AbstractUdfStreamOperator) {
String udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
.getUserFunction().getClass().getName();
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index 1d2a1bb..e674619 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -117,6 +117,12 @@ public abstract class StreamTransformation<T> {
private int parallelism;
/**
+ * The maximum parallelism for this stream transformation. It defines the upper limit for
+ * dynamic scaling and the number of key groups used for partitioned state.
+ */
+ private int maxParallelism = -1;
+
+ /**
* User-specified ID for this transformation. This is used to assign the
* same operator ID across job restarts. There is also the automatically
* generated {@link #id}, which is assigned from a static counter. That
@@ -181,6 +187,24 @@ public abstract class StreamTransformation<T> {
}
/**
+ * Gets the maximum parallelism for this stream transformation.
+ *
+ * @return Maximum parallelism of this transformation.
+ */
+ public int getMaxParallelism() {
+ return maxParallelism;
+ }
+
+ /**
+ * Sets the maximum parallelism for this stream transformation.
+ *
+ * @param maxParallelism Maximum parallelism for this stream transformation.
+ */
+ public void setMaxParallelism(int maxParallelism) {
+ this.maxParallelism = maxParallelism;
+ }
+
+ /**
* Sets an ID for this {@link StreamTransformation}.
*
* <p>The specified ID is used to assign the same operator ID across job
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ConfigurableStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ConfigurableStreamPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ConfigurableStreamPartitioner.java
new file mode 100644
index 0000000..c59c88a
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ConfigurableStreamPartitioner.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+/**
+ * Interface for {@link StreamPartitioner} which have to be configured with the maximum parallelism
+ * of the stream transformation. The configure method is called by the StreamGraph when adding
+ * internal edges.
+ *
+ * This interface is required since the stream partitioners are instantiated eagerly. Due to that
+ * the maximum parallelism might not have been determined and needs to be set at a stage when the
+ * maximum parallelism could have been determined.
+ */
+public interface ConfigurableStreamPartitioner {
+
+ /**
+ * Configure the {@link StreamPartitioner} with the maximum parallelism of the down stream
+ * operator.
+ *
+ * @param maxParallelism Maximum parallelism of the down stream operator.
+ */
+ void configure(int maxParallelism);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
deleted file mode 100644
index 3c93fb7..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/HashPartitioner.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.util.MathUtils;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Partitioner selects the target channel based on the hash value of a key from a
- * {@link KeySelector}.
- *
- * @param <T> Type of the elements in the Stream being partitioned
- */
-@Internal
-public class HashPartitioner<T> extends StreamPartitioner<T> {
- private static final long serialVersionUID = 1L;
-
- private int[] returnArray = new int[1];
- KeySelector<T, ?> keySelector;
-
- public HashPartitioner(KeySelector<T, ?> keySelector) {
- this.keySelector = keySelector;
- }
-
- @Override
- public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
- int numberOfOutputChannels) {
- Object key;
- try {
- key = keySelector.getKey(record.getInstance().getValue());
- } catch (Exception e) {
- throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
- }
- returnArray[0] = MathUtils.murmurHash(key.hashCode()) % numberOfOutputChannels;
-
- return returnArray;
- }
-
- @Override
- public StreamPartitioner<T> copy() {
- return this;
- }
-
- @Override
- public String toString() {
- return "HASH";
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
new file mode 100644
index 0000000..5bcf41b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyGroupAssigner;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Partitioner selects the target channel based on the key group index. The key group
+ * index is derived from the key of the elements using the {@link KeyGroupAssigner}.
+ *
+ * @param <T> Type of the elements in the Stream being partitioned
+ */
+@Internal
+public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implements ConfigurableStreamPartitioner {
+ private static final long serialVersionUID = 1L;
+
+ private final int[] returnArray = new int[1];
+
+ private final KeySelector<T, K> keySelector;
+
+ private final KeyGroupAssigner<K> keyGroupAssigner;
+
+ public KeyGroupStreamPartitioner(KeySelector<T, K> keySelector, KeyGroupAssigner<K> keyGroupAssigner) {
+ this.keySelector = Preconditions.checkNotNull(keySelector);
+ this.keyGroupAssigner = Preconditions.checkNotNull(keyGroupAssigner);
+ }
+
+ public KeyGroupAssigner<K> getKeyGroupAssigner() {
+ return keyGroupAssigner;
+ }
+
+ @Override
+ public int[] selectChannels(
+ SerializationDelegate<StreamRecord<T>> record,
+ int numberOfOutputChannels) {
+
+ K key;
+ try {
+ key = keySelector.getKey(record.getInstance().getValue());
+ } catch (Exception e) {
+ throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
+ }
+ returnArray[0] = keyGroupAssigner.getKeyGroupIndex(key) % numberOfOutputChannels;
+
+ return returnArray;
+ }
+
+ @Override
+ public StreamPartitioner<T> copy() {
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return "HASH";
+ }
+
+ @Override
+ public void configure(int maxParallelism) {
+ keyGroupAssigner.setup(maxParallelism);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index d0617d0..1183306 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -46,7 +46,7 @@ import org.junit.Test;
public class AggregationFunctionTest {
@Test
- public void groupSumIntegerTest() {
+ public void groupSumIntegerTest() throws Exception {
// preparing expected outputs
List<Tuple2<Integer, Integer>> expectedGroupSumList = new ArrayList<>();
@@ -115,7 +115,7 @@ public class AggregationFunctionTest {
}
@Test
- public void pojoGroupSumIntegerTest() {
+ public void pojoGroupSumIntegerTest() throws Exception {
// preparing expected outputs
List<MyPojo> expectedGroupSumList = new ArrayList<>();
@@ -183,7 +183,7 @@ public class AggregationFunctionTest {
}
@Test
- public void minMaxByTest() {
+ public void minMaxByTest() throws Exception {
// Tuples are grouped on field 0, aggregated on field 1
// preparing expected outputs
@@ -250,7 +250,7 @@ public class AggregationFunctionTest {
}
@Test
- public void pojoMinMaxByTest() {
+ public void pojoMinMaxByTest() throws Exception {
// Pojos are grouped on field 0, aggregated on field 1
// preparing expected outputs