You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2017/01/24 09:49:25 UTC
[1/2] flink git commit: [FLINK-5473] Limit max parallelism to 1 for
non-parallel operators
Repository: flink
Updated Branches:
refs/heads/release-1.2 908376ba9 -> 993a2e2fa
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index 3fb4513..3fc1344 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -17,19 +17,12 @@
package org.apache.flink.streaming.api;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
@@ -38,10 +31,20 @@ import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.util.Collector;
import org.apache.flink.util.SplittableIterator;
-
+import org.junit.Assert;
import org.junit.Test;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
public class StreamExecutionEnvironmentTest {
@Test
@@ -124,6 +127,102 @@ public class StreamExecutionEnvironmentTest {
assertTrue(getFunctionFromDataSource(src4) instanceof FromElementsFunction);
}
+ @Test
+ public void testParallelismBounds() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(SourceContext<Integer> ctx) throws Exception {
+ }
+
+ @Override
+ public void cancel() {
+ }
+ };
+
+
+ SingleOutputStreamOperator<Object> operator =
+ env.addSource(srcFun).flatMap(new FlatMapFunction<Integer, Object>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(Integer value, Collector<Object> out) throws Exception {
+
+ }
+ });
+
+ // default value for max parallelism
+ Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
+
+ // bounds for parallelism 1
+ try {
+ operator.setParallelism(0);
+ Assert.fail();
+ } catch (IllegalArgumentException expected) {
+ }
+
+ // bounds for parallelism 2
+ operator.setParallelism(1);
+ Assert.assertEquals(1, operator.getParallelism());
+
+ // bounds for parallelism 3
+ operator.setParallelism(1 << 15);
+ Assert.assertEquals(1 << 15, operator.getParallelism());
+
+ // default value after generating
+ env.getStreamGraph().getJobGraph();
+ Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
+
+ // configured value after generating
+ env.setMaxParallelism(42);
+ env.getStreamGraph().getJobGraph();
+ Assert.assertEquals(42, operator.getTransformation().getMaxParallelism());
+
+ // bounds configured parallelism 1
+ try {
+ env.setMaxParallelism(0);
+ Assert.fail();
+ } catch (IllegalArgumentException expected) {
+ }
+
+ // bounds configured parallelism 2
+ try {
+ env.setMaxParallelism(1 + (1 << 15));
+ Assert.fail();
+ } catch (IllegalArgumentException expected) {
+ }
+
+ // bounds for max parallelism 1
+ try {
+ operator.setMaxParallelism(0);
+ Assert.fail();
+ } catch (IllegalArgumentException expected) {
+ }
+
+ // bounds for max parallelism 2
+ try {
+ operator.setMaxParallelism(1 + (1 << 15));
+ Assert.fail();
+ } catch (IllegalArgumentException expected) {
+ }
+
+ // bounds for max parallelism 3
+ operator.setMaxParallelism(1);
+ Assert.assertEquals(1, operator.getTransformation().getMaxParallelism());
+
+ // bounds for max parallelism 4
+ operator.setMaxParallelism(1 << 15);
+ Assert.assertEquals(1 << 15, operator.getTransformation().getMaxParallelism());
+
+ // override config
+ env.getStreamGraph().getJobGraph();
+ Assert.assertEquals(1 << 15 , operator.getTransformation().getMaxParallelism());
+ }
+
/////////////////////////////////////////////////////////////
// Utilities
/////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index aa86304..5fdacd4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -22,13 +22,12 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
@@ -37,7 +36,6 @@ import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -46,7 +44,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.EvenOddOutputSelector;
import org.apache.flink.streaming.util.NoOpIntMap;
-
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -368,13 +365,9 @@ public class StreamGraphGeneratorTest {
StreamGraph graph = env.getStreamGraph();
- StreamNode keyedResult1Node = graph.getStreamNode(keyedResult1.getId());
- StreamNode keyedResult2Node = graph.getStreamNode(keyedResult2.getId());
StreamNode keyedResult3Node = graph.getStreamNode(keyedResult3.getId());
StreamNode keyedResult4Node = graph.getStreamNode(keyedResult4.getId());
- assertEquals(KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM, keyedResult1Node.getMaxParallelism());
- assertEquals(KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM, keyedResult2Node.getMaxParallelism());
assertEquals(maxParallelism, keyedResult3Node.getMaxParallelism());
assertEquals(maxParallelism, keyedResult4Node.getMaxParallelism());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 8045d82..073632a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -122,19 +123,29 @@ public class RescalingITCase extends TestLogger {
@Test
public void testSavepointRescalingInKeyedState() throws Exception {
- testSavepointRescalingKeyedState(false);
+ testSavepointRescalingKeyedState(false, false);
}
@Test
public void testSavepointRescalingOutKeyedState() throws Exception {
- testSavepointRescalingKeyedState(true);
+ testSavepointRescalingKeyedState(true, false);
+ }
+
+ @Test
+ public void testSavepointRescalingInKeyedStateDerivedMaxParallelism() throws Exception {
+ testSavepointRescalingKeyedState(false, true);
+ }
+
+ @Test
+ public void testSavepointRescalingOutKeyedStateDerivedMaxParallelism() throws Exception {
+ testSavepointRescalingKeyedState(true, true);
}
/**
* Tests that a a job with purely keyed state can be restarted from a savepoint
* with a different parallelism.
*/
- public void testSavepointRescalingKeyedState(boolean scaleOut) throws Exception {
+ public void testSavepointRescalingKeyedState(boolean scaleOut, boolean deriveMaxParallelism) throws Exception {
final int numberKeys = 42;
final int numberElements = 1000;
final int numberElements2 = 500;
@@ -194,7 +205,9 @@ public class RescalingITCase extends TestLogger {
jobID = null;
- JobGraph scaledJobGraph = createJobGraphWithKeyedState(parallelism2, maxParallelism, numberKeys, numberElements2, true, 100);
+ int restoreMaxParallelism = deriveMaxParallelism ? ExecutionJobVertex.VALUE_NOT_SET : maxParallelism;
+
+ JobGraph scaledJobGraph = createJobGraphWithKeyedState(parallelism2, restoreMaxParallelism, numberKeys, numberElements2, true, 100);
scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
@@ -642,7 +655,9 @@ public class RescalingITCase extends TestLogger {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parallelism);
- env.getConfig().setMaxParallelism(maxParallelism);
+ if (0 < maxParallelism) {
+ env.getConfig().setMaxParallelism(maxParallelism);
+ }
env.enableCheckpointing(checkpointingInterval);
env.setRestartStrategy(RestartStrategies.noRestart());
[2/2] flink git commit: [FLINK-5473] Limit max parallelism to 1 for
non-parallel operators
Posted by rm...@apache.org.
[FLINK-5473] Limit max parallelism to 1 for non-parallel operators
[FLINK-5473] Better default behaviours for unspecified maximum parallelism
This closes #3182.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/993a2e2f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/993a2e2f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/993a2e2f
Branch: refs/heads/release-1.2
Commit: 993a2e2fa0ceecff0979a267ace7cd7b8e05d359
Parents: 908376b
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Jan 16 14:31:22 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jan 24 09:49:52 2017 +0100
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 4 +-
.../checkpoint/StateAssignmentOperation.java | 285 ++++++++++---------
.../checkpoint/savepoint/SavepointLoader.java | 8 +-
.../ResultPartitionDeploymentDescriptor.java | 18 +-
.../runtime/executiongraph/ExecutionGraph.java | 8 +-
.../executiongraph/ExecutionJobVertex.java | 115 ++++++--
.../runtime/executiongraph/ExecutionVertex.java | 31 +-
.../api/writer/ResultPartitionWriter.java | 4 +
.../io/network/partition/ResultPartition.java | 8 +
.../flink/runtime/jobgraph/JobVertex.java | 7 +-
.../runtime/state/KeyGroupRangeAssignment.java | 42 ++-
.../apache/flink/runtime/taskmanager/Task.java | 1 +
.../checkpoint/CheckpointCoordinatorTest.java | 1 +
.../savepoint/SavepointLoaderTest.java | 1 +
...ResultPartitionDeploymentDescriptorTest.java | 1 +
.../executiongraph/ExecutionJobVertexTest.java | 140 +++++++++
.../ExecutionVertexDeploymentTest.java | 27 +-
.../network/partition/ResultPartitionTest.java | 1 +
.../consumer/LocalInputChannelTest.java | 1 +
.../runtime/jobmanager/JobManagerTest.java | 2 +
.../runtime/taskmanager/TaskManagerTest.java | 5 +-
.../streaming/api/datastream/KeyedStream.java | 13 +-
.../datastream/SingleOutputStreamOperator.java | 29 +-
.../environment/StreamExecutionEnvironment.java | 12 +-
.../flink/streaming/api/graph/StreamGraph.java | 37 +--
.../api/graph/StreamGraphGenerator.java | 26 +-
.../flink/streaming/api/graph/StreamNode.java | 2 -
.../api/graph/StreamingJobGraphGenerator.java | 13 +-
.../transformations/StreamTransformation.java | 5 +
.../partitioner/KeyGroupStreamPartitioner.java | 1 +
.../streaming/runtime/tasks/OperatorChain.java | 10 +
.../api/StreamExecutionEnvironmentTest.java | 119 +++++++-
.../api/graph/StreamGraphGeneratorTest.java | 9 +-
.../test/checkpointing/RescalingITCase.java | 25 +-
34 files changed, 723 insertions(+), 288 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9132897..78cad91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -885,8 +885,10 @@ public class CheckpointCoordinator {
LOG.info("Restoring from latest valid checkpoint: {}.", latest);
+ final Map<JobVertexID, TaskState> taskStates = latest.getTaskStates();
+
StateAssignmentOperation stateAssignmentOperation =
- new StateAssignmentOperation(LOG, tasks, latest, allowNonRestoredState);
+ new StateAssignmentOperation(LOG, tasks, taskStates, allowNonRestoredState);
stateAssignmentOperation.assignStates();
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 6c23f02..6d075db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -45,29 +45,34 @@ public class StateAssignmentOperation {
private final Logger logger;
private final Map<JobVertexID, ExecutionJobVertex> tasks;
- private final CompletedCheckpoint latest;
+ private final Map<JobVertexID, TaskState> taskStates;
private final boolean allowNonRestoredState;
public StateAssignmentOperation(
Logger logger,
Map<JobVertexID, ExecutionJobVertex> tasks,
- CompletedCheckpoint latest,
+ Map<JobVertexID, TaskState> taskStates,
boolean allowNonRestoredState) {
- this.logger = logger;
- this.tasks = tasks;
- this.latest = latest;
+ this.logger = Preconditions.checkNotNull(logger);
+ this.tasks = Preconditions.checkNotNull(tasks);
+ this.taskStates = Preconditions.checkNotNull(taskStates);
this.allowNonRestoredState = allowNonRestoredState;
}
public boolean assignStates() throws Exception {
+ // this tracks if we find missing node hash ids and already use secondary mappings
boolean expandedToLegacyIds = false;
+
Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks;
- for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : latest.getTaskStates().entrySet()) {
+ for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : taskStates.entrySet()) {
TaskState taskState = taskGroupStateEntry.getValue();
+
+ //----------------------------------------find vertex for state---------------------------------------------
+
ExecutionJobVertex executionJobVertex = localTasks.get(taskGroupStateEntry.getKey());
// on the first time we can not find the execution job vertex for an id, we also consider alternative ids,
@@ -89,8 +94,31 @@ public class StateAssignmentOperation {
}
}
- // check that the number of key groups have not changed
- if (taskState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) {
+ checkParallelismPreconditions(taskState, executionJobVertex);
+
+ assignTaskStatesToOperatorInstances(taskState, executionJobVertex);
+ }
+
+ return true;
+ }
+
+ private void checkParallelismPreconditions(TaskState taskState, ExecutionJobVertex executionJobVertex) {
+ //----------------------------------------max parallelism preconditions-------------------------------------
+
+ // check that the number of key groups have not changed or if we need to override it to satisfy the restored state
+ if (taskState.getMaxParallelism() != executionJobVertex.getMaxParallelism()) {
+
+ if (!executionJobVertex.isMaxParallelismConfigured()) {
+ // if the max parallelism was not explicitly specified by the user, we derive it from the state
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Overriding maximum parallelism for JobVertex " + executionJobVertex.getJobVertexId()
+ + " from " + executionJobVertex.getMaxParallelism() + " to " + taskState.getMaxParallelism());
+ }
+
+ executionJobVertex.setMaxParallelism(taskState.getMaxParallelism());
+ } else {
+ // if the max parallelism was explicitly specified, we complain on mismatch
throw new IllegalStateException("The maximum parallelism (" +
taskState.getMaxParallelism() + ") with which the latest " +
"checkpoint of the execution job vertex " + executionJobVertex +
@@ -98,159 +126,162 @@ public class StateAssignmentOperation {
executionJobVertex.getMaxParallelism() + ") changed. This " +
"is currently not supported.");
}
+ }
- //-------------------------------------------------------------------
+ //----------------------------------------parallelism preconditions-----------------------------------------
- final int oldParallelism = taskState.getParallelism();
- final int newParallelism = executionJobVertex.getParallelism();
- final boolean parallelismChanged = oldParallelism != newParallelism;
- final boolean hasNonPartitionedState = taskState.hasNonPartitionedState();
+ final int oldParallelism = taskState.getParallelism();
+ final int newParallelism = executionJobVertex.getParallelism();
- if (hasNonPartitionedState && parallelismChanged) {
- throw new IllegalStateException("Cannot restore the latest checkpoint because " +
- "the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " +
- "state and its parallelism changed. The operator " + executionJobVertex.getJobVertexId() +
- " has parallelism " + newParallelism + " whereas the corresponding " +
- "state object has a parallelism of " + oldParallelism);
- }
+ if (taskState.hasNonPartitionedState() && (oldParallelism != newParallelism)) {
+ throw new IllegalStateException("Cannot restore the latest checkpoint because " +
+ "the operator " + executionJobVertex.getJobVertexId() + " has non-partitioned " +
+ "state and its parallelism changed. The operator " + executionJobVertex.getJobVertexId() +
+ " has parallelism " + newParallelism + " whereas the corresponding " +
+ "state object has a parallelism of " + oldParallelism);
+ }
+ }
- List<KeyGroupRange> keyGroupPartitions = createKeyGroupPartitions(
- executionJobVertex.getMaxParallelism(),
- newParallelism);
+ private static void assignTaskStatesToOperatorInstances(
+ TaskState taskState, ExecutionJobVertex executionJobVertex) {
- final int chainLength = taskState.getChainLength();
+ final int oldParallelism = taskState.getParallelism();
+ final int newParallelism = executionJobVertex.getParallelism();
- // operator chain idx -> list of the stored op states from all parallel instances for this chain idx
- @SuppressWarnings("unchecked")
- List<OperatorStateHandle>[] parallelOpStatesBackend = new List[chainLength];
- @SuppressWarnings("unchecked")
- List<OperatorStateHandle>[] parallelOpStatesStream = new List[chainLength];
+ List<KeyGroupRange> keyGroupPartitions = createKeyGroupPartitions(
+ executionJobVertex.getMaxParallelism(),
+ newParallelism);
- List<KeyGroupsStateHandle> parallelKeyedStatesBackend = new ArrayList<>(oldParallelism);
- List<KeyGroupsStateHandle> parallelKeyedStateStream = new ArrayList<>(oldParallelism);
+ final int chainLength = taskState.getChainLength();
- for (int p = 0; p < oldParallelism; ++p) {
- SubtaskState subtaskState = taskState.getState(p);
+ // operator chain idx -> list of the stored op states from all parallel instances for this chain idx
+ @SuppressWarnings("unchecked")
+ List<OperatorStateHandle>[] parallelOpStatesBackend = new List[chainLength];
+ @SuppressWarnings("unchecked")
+ List<OperatorStateHandle>[] parallelOpStatesStream = new List[chainLength];
- if (null != subtaskState) {
- collectParallelStatesByChainOperator(
- parallelOpStatesBackend, subtaskState.getManagedOperatorState());
+ List<KeyGroupsStateHandle> parallelKeyedStatesBackend = new ArrayList<>(oldParallelism);
+ List<KeyGroupsStateHandle> parallelKeyedStateStream = new ArrayList<>(oldParallelism);
- collectParallelStatesByChainOperator(
- parallelOpStatesStream, subtaskState.getRawOperatorState());
+ for (int p = 0; p < oldParallelism; ++p) {
+ SubtaskState subtaskState = taskState.getState(p);
- KeyGroupsStateHandle keyedStateBackend = subtaskState.getManagedKeyedState();
- if (null != keyedStateBackend) {
- parallelKeyedStatesBackend.add(keyedStateBackend);
- }
+ if (null != subtaskState) {
+ collectParallelStatesByChainOperator(
+ parallelOpStatesBackend, subtaskState.getManagedOperatorState());
- KeyGroupsStateHandle keyedStateStream = subtaskState.getRawKeyedState();
- if (null != keyedStateStream) {
- parallelKeyedStateStream.add(keyedStateStream);
- }
+ collectParallelStatesByChainOperator(
+ parallelOpStatesStream, subtaskState.getRawOperatorState());
+
+ KeyGroupsStateHandle keyedStateBackend = subtaskState.getManagedKeyedState();
+ if (null != keyedStateBackend) {
+ parallelKeyedStatesBackend.add(keyedStateBackend);
+ }
+
+ KeyGroupsStateHandle keyedStateStream = subtaskState.getRawKeyedState();
+ if (null != keyedStateStream) {
+ parallelKeyedStateStream.add(keyedStateStream);
}
}
+ }
- // operator chain index -> lists with collected states (one collection for each parallel subtasks)
- @SuppressWarnings("unchecked")
- List<Collection<OperatorStateHandle>>[] partitionedParallelStatesBackend = new List[chainLength];
+ // operator chain index -> lists with collected states (one collection for each parallel subtasks)
+ @SuppressWarnings("unchecked")
+ List<Collection<OperatorStateHandle>>[] partitionedParallelStatesBackend = new List[chainLength];
- @SuppressWarnings("unchecked")
- List<Collection<OperatorStateHandle>>[] partitionedParallelStatesStream = new List[chainLength];
+ @SuppressWarnings("unchecked")
+ List<Collection<OperatorStateHandle>>[] partitionedParallelStatesStream = new List[chainLength];
- //TODO here we can employ different redistribution strategies for state, e.g. union state.
- // For now we only offer round robin as the default.
- OperatorStateRepartitioner opStateRepartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
+ //TODO here we can employ different redistribution strategies for state, e.g. union state.
+ // For now we only offer round robin as the default.
+ OperatorStateRepartitioner opStateRepartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
- for (int chainIdx = 0; chainIdx < chainLength; ++chainIdx) {
+ for (int chainIdx = 0; chainIdx < chainLength; ++chainIdx) {
- List<OperatorStateHandle> chainOpParallelStatesBackend = parallelOpStatesBackend[chainIdx];
- List<OperatorStateHandle> chainOpParallelStatesStream = parallelOpStatesStream[chainIdx];
+ List<OperatorStateHandle> chainOpParallelStatesBackend = parallelOpStatesBackend[chainIdx];
+ List<OperatorStateHandle> chainOpParallelStatesStream = parallelOpStatesStream[chainIdx];
- partitionedParallelStatesBackend[chainIdx] = applyRepartitioner(
- opStateRepartitioner,
- chainOpParallelStatesBackend,
- oldParallelism,
- newParallelism);
+ partitionedParallelStatesBackend[chainIdx] = applyRepartitioner(
+ opStateRepartitioner,
+ chainOpParallelStatesBackend,
+ oldParallelism,
+ newParallelism);
- partitionedParallelStatesStream[chainIdx] = applyRepartitioner(
- opStateRepartitioner,
- chainOpParallelStatesStream,
- oldParallelism,
- newParallelism);
- }
+ partitionedParallelStatesStream[chainIdx] = applyRepartitioner(
+ opStateRepartitioner,
+ chainOpParallelStatesStream,
+ oldParallelism,
+ newParallelism);
+ }
- for (int subTaskIdx = 0; subTaskIdx < newParallelism; ++subTaskIdx) {
- // non-partitioned state
- ChainedStateHandle<StreamStateHandle> nonPartitionableState = null;
+ for (int subTaskIdx = 0; subTaskIdx < newParallelism; ++subTaskIdx) {
+ // non-partitioned state
+ ChainedStateHandle<StreamStateHandle> nonPartitionableState = null;
- if (!parallelismChanged) {
- if (taskState.getState(subTaskIdx) != null) {
- nonPartitionableState = taskState.getState(subTaskIdx).getLegacyOperatorState();
- }
+ if (oldParallelism == newParallelism) {
+ if (taskState.getState(subTaskIdx) != null) {
+ nonPartitionableState = taskState.getState(subTaskIdx).getLegacyOperatorState();
}
+ }
- // partitionable state
- @SuppressWarnings("unchecked")
- Collection<OperatorStateHandle>[] iab = new Collection[chainLength];
- @SuppressWarnings("unchecked")
- Collection<OperatorStateHandle>[] ias = new Collection[chainLength];
- List<Collection<OperatorStateHandle>> operatorStateFromBackend = Arrays.asList(iab);
- List<Collection<OperatorStateHandle>> operatorStateFromStream = Arrays.asList(ias);
-
- for (int chainIdx = 0; chainIdx < partitionedParallelStatesBackend.length; ++chainIdx) {
- List<Collection<OperatorStateHandle>> redistributedOpStateBackend =
- partitionedParallelStatesBackend[chainIdx];
+ // partitionable state
+ @SuppressWarnings("unchecked")
+ Collection<OperatorStateHandle>[] iab = new Collection[chainLength];
+ @SuppressWarnings("unchecked")
+ Collection<OperatorStateHandle>[] ias = new Collection[chainLength];
+ List<Collection<OperatorStateHandle>> operatorStateFromBackend = Arrays.asList(iab);
+ List<Collection<OperatorStateHandle>> operatorStateFromStream = Arrays.asList(ias);
- List<Collection<OperatorStateHandle>> redistributedOpStateStream =
- partitionedParallelStatesStream[chainIdx];
+ for (int chainIdx = 0; chainIdx < partitionedParallelStatesBackend.length; ++chainIdx) {
+ List<Collection<OperatorStateHandle>> redistributedOpStateBackend =
+ partitionedParallelStatesBackend[chainIdx];
- if (redistributedOpStateBackend != null) {
- operatorStateFromBackend.set(chainIdx, redistributedOpStateBackend.get(subTaskIdx));
- }
+ List<Collection<OperatorStateHandle>> redistributedOpStateStream =
+ partitionedParallelStatesStream[chainIdx];
- if (redistributedOpStateStream != null) {
- operatorStateFromStream.set(chainIdx, redistributedOpStateStream.get(subTaskIdx));
- }
+ if (redistributedOpStateBackend != null) {
+ operatorStateFromBackend.set(chainIdx, redistributedOpStateBackend.get(subTaskIdx));
}
- Execution currentExecutionAttempt = executionJobVertex
- .getTaskVertices()[subTaskIdx]
- .getCurrentExecutionAttempt();
+ if (redistributedOpStateStream != null) {
+ operatorStateFromStream.set(chainIdx, redistributedOpStateStream.get(subTaskIdx));
+ }
+ }
- List<KeyGroupsStateHandle> newKeyedStatesBackend;
- List<KeyGroupsStateHandle> newKeyedStateStream;
- if (parallelismChanged) {
- KeyGroupRange subtaskKeyGroupIds = keyGroupPartitions.get(subTaskIdx);
- newKeyedStatesBackend = getKeyGroupsStateHandles(parallelKeyedStatesBackend, subtaskKeyGroupIds);
- newKeyedStateStream = getKeyGroupsStateHandles(parallelKeyedStateStream, subtaskKeyGroupIds);
+ Execution currentExecutionAttempt = executionJobVertex
+ .getTaskVertices()[subTaskIdx]
+ .getCurrentExecutionAttempt();
+
+ List<KeyGroupsStateHandle> newKeyedStatesBackend;
+ List<KeyGroupsStateHandle> newKeyedStateStream;
+ if (oldParallelism == newParallelism) {
+ SubtaskState subtaskState = taskState.getState(subTaskIdx);
+ if (subtaskState != null) {
+ KeyGroupsStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState();
+ KeyGroupsStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState();
+ newKeyedStatesBackend = oldKeyedStatesBackend != null ? Collections.singletonList(
+ oldKeyedStatesBackend) : null;
+ newKeyedStateStream = oldKeyedStatesStream != null ? Collections.singletonList(
+ oldKeyedStatesStream) : null;
} else {
- SubtaskState subtaskState = taskState.getState(subTaskIdx);
- if (subtaskState != null) {
- KeyGroupsStateHandle oldKeyedStatesBackend = subtaskState.getManagedKeyedState();
- KeyGroupsStateHandle oldKeyedStatesStream = subtaskState.getRawKeyedState();
- newKeyedStatesBackend = oldKeyedStatesBackend != null ? Collections.singletonList(
- oldKeyedStatesBackend) : null;
- newKeyedStateStream = oldKeyedStatesStream != null ? Collections.singletonList(
- oldKeyedStatesStream) : null;
- } else {
- newKeyedStatesBackend = null;
- newKeyedStateStream = null;
- }
+ newKeyedStatesBackend = null;
+ newKeyedStateStream = null;
}
+ } else {
+ KeyGroupRange subtaskKeyGroupIds = keyGroupPartitions.get(subTaskIdx);
+ newKeyedStatesBackend = getKeyGroupsStateHandles(parallelKeyedStatesBackend, subtaskKeyGroupIds);
+ newKeyedStateStream = getKeyGroupsStateHandles(parallelKeyedStateStream, subtaskKeyGroupIds);
+ }
- TaskStateHandles taskStateHandles = new TaskStateHandles(
- nonPartitionableState,
- operatorStateFromBackend,
- operatorStateFromStream,
- newKeyedStatesBackend,
- newKeyedStateStream);
+ TaskStateHandles taskStateHandles = new TaskStateHandles(
+ nonPartitionableState,
+ operatorStateFromBackend,
+ operatorStateFromStream,
+ newKeyedStatesBackend,
+ newKeyedStateStream);
- currentExecutionAttempt.setInitialState(taskStateHandles);
- }
+ currentExecutionAttempt.setInitialState(taskStateHandles);
}
-
- return true;
}
/**
@@ -298,7 +329,7 @@ public class StateAssignmentOperation {
/**
* @param chainParallelOpStates array = chain ops, array[idx] = parallel states for this chain op.
- * @param chainOpState
+ * @param chainOpState the operator chain
*/
private static void collectParallelStatesByChainOperator(
List<OperatorStateHandle>[] chainParallelOpStates, ChainedStateHandle<OperatorStateHandle> chainOpState) {
@@ -359,4 +390,4 @@ public class StateAssignmentOperation {
return repackStream;
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index d6be482..950a9a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -81,10 +81,11 @@ public class SavepointLoader {
}
if (executionJobVertex != null) {
- if (executionJobVertex.getMaxParallelism() == taskState.getMaxParallelism()) {
+
+ if (executionJobVertex.getMaxParallelism() == taskState.getMaxParallelism()
+ || !executionJobVertex.isMaxParallelismConfigured()) {
taskStates.put(taskState.getJobVertexID(), taskState);
- }
- else {
+ } else {
String msg = String.format("Failed to rollback to savepoint %s. " +
"Max parallelism mismatch between savepoint state and new program. " +
"Cannot map operator %s with max parallelism %d to new program with " +
@@ -106,6 +107,7 @@ public class SavepointLoader {
"you want to allow to skip this, you can set the --allowNonRestoredState " +
"option on the CLI.",
savepointPath, taskState.getJobVertexID());
+
throw new IllegalStateException(msg);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 14c7d2a..061f925 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import java.io.Serializable;
@@ -36,6 +37,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class ResultPartitionDeploymentDescriptor implements Serializable {
+ private static final long serialVersionUID = 6343547936086963705L;
+
/** The ID of the result this partition belongs to. */
private final IntermediateDataSetID resultId;
@@ -47,6 +50,9 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
/** The number of subpartitions. */
private final int numberOfSubpartitions;
+
+ /** The maximum parallelism */
+ private final int maxParallelism;
/** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */
private final boolean sendScheduleOrUpdateConsumersMessage;
@@ -56,14 +62,17 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
IntermediateResultPartitionID partitionId,
ResultPartitionType partitionType,
int numberOfSubpartitions,
+ int maxParallelism,
boolean lazyScheduling) {
this.resultId = checkNotNull(resultId);
this.partitionId = checkNotNull(partitionId);
this.partitionType = checkNotNull(partitionType);
+ KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
checkArgument(numberOfSubpartitions >= 1);
this.numberOfSubpartitions = numberOfSubpartitions;
+ this.maxParallelism = maxParallelism;
this.sendScheduleOrUpdateConsumersMessage = lazyScheduling;
}
@@ -83,6 +92,10 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
return numberOfSubpartitions;
}
+ public int getMaxParallelism() {
+ return maxParallelism;
+ }
+
public boolean sendScheduleOrUpdateConsumersMessage() {
return sendScheduleOrUpdateConsumersMessage;
}
@@ -96,7 +109,8 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
// ------------------------------------------------------------------------
- public static ResultPartitionDeploymentDescriptor from(IntermediateResultPartition partition, boolean lazyScheduling) {
+ public static ResultPartitionDeploymentDescriptor from(
+ IntermediateResultPartition partition, int maxParallelism, boolean lazyScheduling) {
final IntermediateDataSetID resultId = partition.getIntermediateResult().getId();
final IntermediateResultPartitionID partitionId = partition.getPartitionId();
@@ -118,6 +132,6 @@ public class ResultPartitionDeploymentDescriptor implements Serializable {
}
return new ResultPartitionDeploymentDescriptor(
- resultId, partitionId, partitionType, numberOfSubpartitions, lazyScheduling);
+ resultId, partitionId, partitionType, numberOfSubpartitions, maxParallelism, lazyScheduling);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f31eada..b9e6c84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -663,12 +663,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
// create the execution job vertex and attach it to the graph
- ExecutionJobVertex ejv = null;
- try {
- ejv = new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);
- } catch (IOException e) {
- throw new JobException("Could not create a execution job vertex for " + jobVertex.getID() + '.', e);
- }
+ ExecutionJobVertex ejv =
+ new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);
ejv.connectToPredecessors(this.intermediateResults);
ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index fbab572..e8664f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -41,7 +41,7 @@ import org.apache.flink.runtime.jobmanager.JobManagerOptions;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
@@ -57,8 +57,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
/** Use the same log for all ExecutionGraph classes */
private static final Logger LOG = ExecutionGraph.LOG;
-
- private final SerializableObject stateMonitor = new SerializableObject();
+
+ public static final int VALUE_NOT_SET = -1;
+
+ private final Object stateMonitor = new Object();
private final ExecutionGraph graph;
@@ -66,30 +68,32 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
private final ExecutionVertex[] taskVertices;
- private IntermediateResult[] producedDataSets;
+ private final IntermediateResult[] producedDataSets;
private final List<IntermediateResult> inputs;
private final int parallelism;
- private final int maxParallelism;
-
private final boolean[] finishedSubtasks;
-
- private volatile int numSubtasksInFinalState;
-
+
private final SlotSharingGroup slotSharingGroup;
-
+
private final CoLocationGroup coLocationGroup;
-
+
private final InputSplit[] inputSplits;
+ private final boolean maxParallelismConfigured;
+
+ private int maxParallelism;
+
+ private volatile int numSubtasksInFinalState;
+
/**
* Serialized task information which is for all sub tasks the same. Thus, it avoids to
* serialize the same information multiple times in order to create the
* TaskDeploymentDescriptors.
*/
- private final SerializedValue<TaskInformation> serializedTaskInformation;
+ private SerializedValue<TaskInformation> serializedTaskInformation;
private InputSplitAssigner splitAssigner;
@@ -97,7 +101,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
ExecutionGraph graph,
JobVertex jobVertex,
int defaultParallelism,
- Time timeout) throws JobException, IOException {
+ Time timeout) throws JobException {
this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis());
}
@@ -107,7 +111,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
JobVertex jobVertex,
int defaultParallelism,
Time timeout,
- long createTimestamp) throws JobException, IOException {
+ long createTimestamp) throws JobException {
if (graph == null || jobVertex == null) {
throw new NullPointerException();
@@ -121,24 +125,19 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
this.parallelism = numTaskVertices;
- int maxP = jobVertex.getMaxParallelism();
+ final int configuredMaxParallelism = jobVertex.getMaxParallelism();
+
+ this.maxParallelismConfigured = (VALUE_NOT_SET != configuredMaxParallelism);
- Preconditions.checkArgument(maxP >= parallelism, "The maximum parallelism (" +
- maxP + ") must be greater or equal than the parallelism (" + parallelism +
- ").");
- this.maxParallelism = maxP;
+ // if no max parallelism was configured by the user, we calculate and set a default
+ setMaxParallelismInternal(maxParallelismConfigured ?
+ configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(parallelism));
- this.serializedTaskInformation = new SerializedValue<>(new TaskInformation(
- jobVertex.getID(),
- jobVertex.getName(),
- parallelism,
- maxParallelism,
- jobVertex.getInvokableClassName(),
- jobVertex.getConfiguration()));
+ this.serializedTaskInformation = null;
this.taskVertices = new ExecutionVertex[numTaskVertices];
- this.inputs = new ArrayList<IntermediateResult>(jobVertex.getInputs().size());
+ this.inputs = new ArrayList<>(jobVertex.getInputs().size());
// take the sharing group
this.slotSharingGroup = jobVertex.getSlotSharingGroup();
@@ -212,6 +211,24 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
finishedSubtasks = new boolean[parallelism];
}
+ public void setMaxParallelism(int maxParallelismDerived) {
+
+ Preconditions.checkState(!maxParallelismConfigured,
+ "Attempt to override a configured max parallelism. Configured: " + this.maxParallelism
+ + ", argument: " + maxParallelismDerived);
+
+ setMaxParallelismInternal(maxParallelismDerived);
+ }
+
+ private void setMaxParallelismInternal(int maxParallelism) {
+ Preconditions.checkArgument(maxParallelism > 0
+ && maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+ "Overriding max parallelism is not in valid bounds (1.." +
+ KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + "), found:" + maxParallelism);
+
+ this.maxParallelism = maxParallelism;
+ }
+
public ExecutionGraph getGraph() {
return graph;
}
@@ -235,6 +252,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
return maxParallelism;
}
+ public boolean isMaxParallelismConfigured() {
+ return maxParallelismConfigured;
+ }
+
public JobID getJobId() {
return graph.getJobID();
}
@@ -269,24 +290,56 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
return inputs;
}
- public SerializedValue<TaskInformation> getSerializedTaskInformation() {
+ public SerializedValue<TaskInformation> getSerializedTaskInformation() throws IOException {
+
+ if (null == serializedTaskInformation) {
+
+ int parallelism = getParallelism();
+ int maxParallelism = getMaxParallelism();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating task information for " + generateDebugString());
+ }
+
+ serializedTaskInformation = new SerializedValue<>(
+ new TaskInformation(
+ jobVertex.getID(),
+ jobVertex.getName(),
+ parallelism,
+ maxParallelism,
+ jobVertex.getInvokableClassName(),
+ jobVertex.getConfiguration()));
+ }
+
return serializedTaskInformation;
}
-
+
public boolean isInFinalState() {
return numSubtasksInFinalState == parallelism;
}
-
+
@Override
public ExecutionState getAggregateState() {
int[] num = new int[ExecutionState.values().length];
for (ExecutionVertex vertex : this.taskVertices) {
num[vertex.getExecutionState().ordinal()]++;
}
-
+
return getAggregateJobVertexState(num, parallelism);
}
+ private String generateDebugString() {
+
+ return "ExecutionJobVertex" +
+ "(" + jobVertex.getName() + " | " + jobVertex.getID() + ")" +
+ "{" +
+ "parallelism=" + parallelism +
+ ", maxParallelism=" + getMaxParallelism() +
+ ", maxParallelismConfigured=" + maxParallelismConfigured +
+ '}';
+ }
+
+
//---------------------------------------------------------------------------------------------
public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 5cbd1c1..09497e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -40,13 +40,16 @@ import org.apache.flink.runtime.jobmanager.JobManagerOptions;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.EvictingBoundedList;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -599,7 +602,24 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
boolean lazyScheduling = getExecutionGraph().getScheduleMode().allowLazyDeployment();
for (IntermediateResultPartition partition : resultPartitions.values()) {
- producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, lazyScheduling));
+
+ List<List<ExecutionEdge>> consumers = partition.getConsumers();
+
+ if (consumers.isEmpty()) {
+ //TODO this case only exists for test, currently there has to be exactly one consumer in real jobs!
+ producedPartitions.add(ResultPartitionDeploymentDescriptor.from(
+ partition,
+ KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+ lazyScheduling));
+ } else {
+ Preconditions.checkState(1 == consumers.size(),
+ "Only one consumer supported in the current implementation! Found: " + consumers.size());
+
+ List<ExecutionEdge> consumer = consumers.get(0);
+ ExecutionJobVertex vertex = consumer.get(0).getTarget().getJobVertex();
+ int maxParallelism = vertex.getMaxParallelism();
+ producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, maxParallelism, lazyScheduling));
+ }
}
@@ -620,7 +640,14 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
}
SerializedValue<JobInformation> serializedJobInformation = getExecutionGraph().getSerializedJobInformation();
- SerializedValue<TaskInformation> serializedJobVertexInformation = jobVertex.getSerializedTaskInformation();
+ SerializedValue<TaskInformation> serializedJobVertexInformation = null;
+
+ try {
+ serializedJobVertexInformation = jobVertex.getSerializedTaskInformation();
+ } catch (IOException e) {
+ throw new ExecutionGraphException(
+ "Could not create a serialized JobVertexInformation for " + jobVertex.getJobVertexId(), e);
+ }
return new TaskDeploymentDescriptor(
serializedJobInformation,
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index cfab34d..2a1bed8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -63,6 +63,10 @@ public class ResultPartitionWriter implements EventListener<TaskEvent> {
return partition.getNumberOfSubpartitions();
}
+ public int getNumTargetKeyGroups() {
+ return partition.getNumTargetKeyGroups();
+ }
+
// ------------------------------------------------------------------------
// Data processing
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 474c25c..3d92584 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -95,6 +95,8 @@ public class ResultPartition implements BufferPoolOwner {
private final ResultPartitionConsumableNotifier partitionConsumableNotifier;
+ public final int numTargetKeyGroups;
+
private final boolean sendScheduleOrUpdateConsumersMessage;
// - Runtime state --------------------------------------------------------
@@ -131,6 +133,7 @@ public class ResultPartition implements BufferPoolOwner {
ResultPartitionID partitionId,
ResultPartitionType partitionType,
int numberOfSubpartitions,
+ int numTargetKeyGroups,
ResultPartitionManager partitionManager,
ResultPartitionConsumableNotifier partitionConsumableNotifier,
IOManager ioManager,
@@ -142,6 +145,7 @@ public class ResultPartition implements BufferPoolOwner {
this.partitionId = checkNotNull(partitionId);
this.partitionType = checkNotNull(partitionType);
this.subpartitions = new ResultSubpartition[numberOfSubpartitions];
+ this.numTargetKeyGroups = numTargetKeyGroups;
this.partitionManager = checkNotNull(partitionManager);
this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier);
this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
@@ -356,6 +360,10 @@ public class ResultPartition implements BufferPoolOwner {
return cause;
}
+ public int getNumTargetKeyGroups() {
+ return numTargetKeyGroups;
+ }
+
/**
* Releases buffers held by this result partition.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index d24100e..9dcaeeb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -40,7 +40,6 @@ public class JobVertex implements java.io.Serializable {
private static final String DEFAULT_NAME = "(unnamed vertex)";
-
// --------------------------------------------------------------------------------------------
// Members that define the structure / topology of the graph
// --------------------------------------------------------------------------------------------
@@ -60,7 +59,7 @@ public class JobVertex implements java.io.Serializable {
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
/** Maximum number of subtasks to split this taks into a runtime. */
- private int maxParallelism = Short.MAX_VALUE;
+ private int maxParallelism = -1;
/** Custom configuration passed to the assigned task at runtime. */
private Configuration configuration;
@@ -276,10 +275,6 @@ public class JobVertex implements java.io.Serializable {
* @param maxParallelism The maximum parallelism to be set. must be between 1 and Short.MAX_VALUE.
*/
public void setMaxParallelism(int maxParallelism) {
- org.apache.flink.util.Preconditions.checkArgument(
- maxParallelism > 0 && maxParallelism <= (1 << 15),
- "The max parallelism must be at least 1 and smaller than 2^15.");
-
this.maxParallelism = maxParallelism;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
index 894f721..62bf3f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
@@ -23,7 +23,14 @@ import org.apache.flink.util.Preconditions;
public final class KeyGroupRangeAssignment {
- public static final int DEFAULT_MAX_PARALLELISM = 128;
+ /**
+ * The default lower bound for max parallelism if nothing was configured by the user. We have this so allow users
+ * some degree of scale-up in case they forgot to configure maximum parallelism explicitly.
+ */
+ public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7;
+
+ /** The (inclusive) upper bound for max parallelism */
+ public static final int UPPER_BOUND_MAX_PARALLELISM = 1 << 15;
private KeyGroupRangeAssignment() {
throw new AssertionError();
@@ -79,9 +86,12 @@ public final class KeyGroupRangeAssignment {
int maxParallelism,
int parallelism,
int operatorIndex) {
- Preconditions.checkArgument(parallelism > 0, "Parallelism must not be smaller than zero.");
- Preconditions.checkArgument(maxParallelism >= parallelism, "Maximum parallelism must not be smaller than parallelism.");
- Preconditions.checkArgument(maxParallelism <= (1 << 15), "Maximum parallelism must be smaller than 2^15.");
+
+ checkParallelismPreconditions(parallelism);
+ checkParallelismPreconditions(maxParallelism);
+
+ Preconditions.checkArgument(maxParallelism >= parallelism,
+ "Maximum parallelism must not be smaller than parallelism.");
int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1;
int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
@@ -105,4 +115,28 @@ public final class KeyGroupRangeAssignment {
public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
return keyGroupId * parallelism / maxParallelism;
}
+
+ /**
+ * Computes a default maximum parallelism from the operator parallelism. This is used in case the user has not
+ * explicitly configured a maximum parallelism to still allow a certain degree of scale-up.
+ *
+ * @param operatorParallelism the operator parallelism as basis for computation.
+ * @return the computed default maximum parallelism.
+ */
+ public static int computeDefaultMaxParallelism(int operatorParallelism) {
+
+ checkParallelismPreconditions(operatorParallelism);
+
+ return Math.min(
+ Math.max(
+ MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),
+ DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
+ UPPER_BOUND_MAX_PARALLELISM);
+ }
+
+ public static void checkParallelismPreconditions(int parallelism) {
+ Preconditions.checkArgument(parallelism > 0
+ && parallelism <= UPPER_BOUND_MAX_PARALLELISM,
+ "Operator parallelism not within bounds: " + parallelism);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index e945b93..3c57e3f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -343,6 +343,7 @@ public class Task implements Runnable, TaskActions {
partitionId,
desc.getPartitionType(),
desc.getNumberOfSubpartitions(),
+ desc.getMaxParallelism(),
networkEnvironment.getResultPartitionManager(),
resultPartitionConsumableNotifier,
ioManager,
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index ca9dbc2..6ba557b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2607,6 +2607,7 @@ public class CheckpointCoordinatorTest {
when(executionJobVertex.getTaskVertices()).thenReturn(executionVertices);
when(executionJobVertex.getParallelism()).thenReturn(parallelism);
when(executionJobVertex.getMaxParallelism()).thenReturn(maxParallelism);
+ when(executionJobVertex.isMaxParallelismConfigured()).thenReturn(true);
return executionJobVertex;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index 67575d6..6471d6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -87,6 +87,7 @@ public class SavepointLoaderTest {
// 2) Load and validate: max parallelism mismatch
when(vertex.getMaxParallelism()).thenReturn(222);
+ when(vertex.isMaxParallelismConfigured()).thenReturn(true);
try {
SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, false);
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 3ed8236..aac2e13 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -46,6 +46,7 @@ public class ResultPartitionDeploymentDescriptorTest {
partitionId,
partitionType,
numberOfSubpartitions,
+ numberOfSubpartitions,
true);
ResultPartitionDeploymentDescriptor copy =
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
new file mode 100644
index 0000000..f0f6248
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ExecutionJobVertexTest {
+
+ private static final int NOT_CONFIGURED = -1;
+
+ @Test
+ public void testMaxParallelismDefaulting() throws Exception {
+
+ // default minimum
+ ExecutionJobVertex executionJobVertex = createExecutionJobVertex(1, NOT_CONFIGURED);
+ Assert.assertEquals(128, executionJobVertex.getMaxParallelism());
+
+ // test round up part 1
+ executionJobVertex = createExecutionJobVertex(171, NOT_CONFIGURED);
+ Assert.assertEquals(256, executionJobVertex.getMaxParallelism());
+
+ // test round up part 2
+ executionJobVertex = createExecutionJobVertex(172, NOT_CONFIGURED);
+ Assert.assertEquals(512, executionJobVertex.getMaxParallelism());
+
+ // test round up limit
+ executionJobVertex = createExecutionJobVertex(1 << 15, NOT_CONFIGURED);
+ Assert.assertEquals(1 << 15, executionJobVertex.getMaxParallelism());
+
+ // test upper bound
+ try {
+ executionJobVertex = createExecutionJobVertex(1 + (1 << 15), NOT_CONFIGURED);
+ executionJobVertex.getMaxParallelism();
+ Assert.fail();
+ } catch (IllegalArgumentException ignore) {
+ }
+
+ // test configured / trumps computed default
+ executionJobVertex = createExecutionJobVertex(172, 4);
+ Assert.assertEquals(4, executionJobVertex.getMaxParallelism());
+
+
+ // test configured / trumps computed default
+ executionJobVertex = createExecutionJobVertex(4, 1 << 15);
+ Assert.assertEquals(1 << 15, executionJobVertex.getMaxParallelism());
+
+ // test upper bound configured
+ try {
+ executionJobVertex = createExecutionJobVertex(4, 1 + (1 << 15));
+ Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
+ } catch (IllegalArgumentException ignore) {
+ }
+
+ // test lower bound configured
+ try {
+ executionJobVertex = createExecutionJobVertex(4, 0);
+ Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
+ } catch (IllegalArgumentException ignore) {
+ }
+
+ // test override trumps test configured 2
+ executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED);
+ executionJobVertex.setMaxParallelism(7);
+ Assert.assertEquals(7, executionJobVertex.getMaxParallelism());
+
+ // test lower bound with derived value
+ executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED);
+ try {
+ executionJobVertex.setMaxParallelism(0);
+ Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
+ } catch (IllegalArgumentException ignore) {
+ }
+
+ // test upper bound with derived value
+ executionJobVertex = createExecutionJobVertex(4, NOT_CONFIGURED);
+ try {
+ executionJobVertex.setMaxParallelism(1 + (1 << 15));
+ Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
+ } catch (IllegalArgumentException ignore) {
+ }
+
+ // test complain on setting derived value in presence of a configured value
+ executionJobVertex = createExecutionJobVertex(4, 16);
+ try {
+ executionJobVertex.setMaxParallelism(7);
+ Assert.fail(String.valueOf(executionJobVertex.getMaxParallelism()));
+ } catch (IllegalStateException ignore) {
+ }
+
+ }
+
+ //------------------------------------------------------------------------------------------------------
+
+ private static ExecutionJobVertex createExecutionJobVertex(
+ int parallelism,
+ int preconfiguredMaxParallelism) throws JobException, IOException {
+
+ JobVertex jobVertex = new JobVertex("testVertex");
+ jobVertex.setInvokableClass(AbstractInvokable.class);
+ jobVertex.setParallelism(parallelism);
+
+ if (NOT_CONFIGURED != preconfiguredMaxParallelism) {
+ jobVertex.setMaxParallelism(preconfiguredMaxParallelism);
+ }
+
+ ExecutionGraph executionGraphMock = mock(ExecutionGraph.class);
+ when(executionGraphMock.getFutureExecutor()).thenReturn(Executors.directExecutor());
+ ExecutionJobVertex executionJobVertex =
+ new ExecutionJobVertex(executionGraphMock, jobVertex, 1, Time.seconds(10));
+
+ return executionJobVertex;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 8bc39a7..8becd7f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -354,8 +354,17 @@ public class ExecutionVertexDeploymentTest {
public void testTddProducedPartitionsLazyScheduling() throws Exception {
TestingUtils.QueuedActionExecutionContext context = TestingUtils.queuedActionExecutionContext();
ExecutionJobVertex jobVertex = getExecutionVertex(new JobVertexID(), context);
- IntermediateResult result = new IntermediateResult(new IntermediateDataSetID(), jobVertex, 4, ResultPartitionType.PIPELINED);
- ExecutionVertex vertex = new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes(1));
+
+ IntermediateResult result =
+ new IntermediateResult(new IntermediateDataSetID(), jobVertex, 1, ResultPartitionType.PIPELINED);
+
+ ExecutionVertex vertex =
+ new ExecutionVertex(jobVertex, 0, new IntermediateResult[]{result}, Time.minutes(1));
+
+ ExecutionEdge mockEdge = createMockExecutionEdge(1);
+
+ result.getPartitions()[0].addConsumerGroup();
+ result.getPartitions()[0].addConsumer(mockEdge, 0);
Slot root = mock(Slot.class);
when(root.getSlotNumber()).thenReturn(1);
@@ -374,4 +383,18 @@ public class ExecutionVertexDeploymentTest {
assertEquals(mode.allowLazyDeployment(), desc.sendScheduleOrUpdateConsumersMessage());
}
}
+
+
+
+ private ExecutionEdge createMockExecutionEdge(int maxParallelism) {
+ ExecutionVertex targetVertex = mock(ExecutionVertex.class);
+ ExecutionJobVertex targetJobVertex = mock(ExecutionJobVertex.class);
+
+ when(targetVertex.getJobVertex()).thenReturn(targetJobVertex);
+ when(targetJobVertex.getMaxParallelism()).thenReturn(maxParallelism);
+
+ ExecutionEdge edge = mock(ExecutionEdge.class);
+ when(edge.getTarget()).thenReturn(targetVertex);
+ return edge;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 4eb4fd1..f6562a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -83,6 +83,7 @@ public class ResultPartitionTest {
new ResultPartitionID(),
type,
1,
+ 1,
mock(ResultPartitionManager.class),
notifier,
mock(IOManager.class),
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 37ec751..e05fb56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -120,6 +120,7 @@ public class LocalInputChannelTest {
partitionIds[i],
ResultPartitionType.PIPELINED,
parallelism,
+ parallelism,
partitionManager,
partitionConsumableNotifier,
ioManager,
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 1fea6f6..2fd2b98 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -624,10 +624,12 @@ public class JobManagerTest {
JobGraph jobGraph = new JobGraph("croissant");
JobVertex jobVertex1 = new JobVertex("cappuccino");
jobVertex1.setParallelism(4);
+ jobVertex1.setMaxParallelism(16);
jobVertex1.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
JobVertex jobVertex2 = new JobVertex("americano");
jobVertex2.setParallelism(4);
+ jobVertex2.setMaxParallelism(16);
jobVertex2.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
jobGraph.addVertex(jobVertex1);
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index e24410e..2fb5fa8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -631,7 +631,7 @@ public class TaskManagerTest extends TestLogger {
IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
- irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, true));
+ irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, 1, true));
InputGateDeploymentDescriptor ircdd =
new InputGateDeploymentDescriptor(
@@ -776,7 +776,7 @@ public class TaskManagerTest extends TestLogger {
IntermediateResultPartitionID partitionId = new IntermediateResultPartitionID();
List<ResultPartitionDeploymentDescriptor> irpdd = new ArrayList<ResultPartitionDeploymentDescriptor>();
- irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, true));
+ irpdd.add(new ResultPartitionDeploymentDescriptor(new IntermediateDataSetID(), partitionId, ResultPartitionType.PIPELINED, 1, 1, true));
InputGateDeploymentDescriptor ircdd =
new InputGateDeploymentDescriptor(
@@ -1486,6 +1486,7 @@ public class TaskManagerTest extends TestLogger {
new IntermediateResultPartitionID(),
ResultPartitionType.PIPELINED,
1,
+ 1,
true);
final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 3e3afd3..7f33275 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -17,9 +17,9 @@
package org.apache.flink.streaming.api.datastream;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
@@ -30,7 +30,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.RichProcessFunction;
@@ -40,17 +39,18 @@ import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.functions.query.QueryableAppendingStateOperator;
import org.apache.flink.streaming.api.functions.query.QueryableValueStateOperator;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.StreamGroupedFold;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
@@ -113,8 +113,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
dataStream.getExecutionEnvironment(),
new PartitionTransformation<>(
dataStream.getTransformation(),
- new KeyGroupStreamPartitioner<>(
- keySelector, KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM)));
+ new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)));
this.keySelector = keySelector;
this.keyType = keyType;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 3fe21fb..9dd60b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -17,8 +17,8 @@
package org.apache.flink.streaming.api.datastream;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -115,18 +115,18 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
/**
* Sets the parallelism for this operator. The degree must be 1 or more.
- *
+ *
* @param parallelism
* The parallelism for this operator.
* @return The operator with set parallelism.
*/
public SingleOutputStreamOperator<T> setParallelism(int parallelism) {
- if (parallelism < 1) {
- throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
- }
- if (nonParallel && parallelism > 1) {
- throw new IllegalArgumentException("The parallelism of non parallel operator must be 1.");
- }
+ Preconditions.checkArgument(parallelism > 0,
+ "The parallelism of an operator must be at least 1.");
+
+ Preconditions.checkArgument(canBeParallel() || parallelism == 1,
+ "The parallelism of non parallel operator must be 1.");
+
transformation.setParallelism(parallelism);
return this;
@@ -143,15 +143,23 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
*/
@PublicEvolving
public SingleOutputStreamOperator<T> setMaxParallelism(int maxParallelism) {
- Preconditions.checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0.");
+ Preconditions.checkArgument(maxParallelism > 0,
+ "The maximum parallelism must be greater than 0.");
+
+ Preconditions.checkArgument(canBeParallel() || maxParallelism == 1,
+ "The maximum parallelism of non parallel operator must be 1.");
transformation.setMaxParallelism(maxParallelism);
return this;
}
+ private boolean canBeParallel() {
+ return !nonParallel;
+ }
+
/**
- * Sets the parallelism of this operator to one.
+ * Sets the parallelism and maximum parallelism of this operator to one.
* And mark this operator cannot set a non-1 degree of parallelism.
*
* @return The operator with only one parallelism.
@@ -159,6 +167,7 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> {
@PublicEvolving
public SingleOutputStreamOperator<T> forceNonParallel() {
transformation.setParallelism(1);
+ transformation.setMaxParallelism(1);
nonParallel = true;
return this;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 5b4b901..dab0a06 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -48,6 +48,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -166,16 +167,19 @@ public abstract class StreamExecutionEnvironment {
}
/**
- * Sets the maximum degree of parallelism defined for the program.
+ * Sets the maximum degree of parallelism defined for the program. The upper limit (inclusive) is Short.MAX_VALUE.
*
* The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
* defines the number of key groups used for partitioned state.
*
- * @param maxParallelism Maximum degree of parallelism to be used for the program., with 0 < maxParallelism <= 2^15
+ * @param maxParallelism Maximum degree of parallelism to be used for the program., with 0 < maxParallelism <= 2^15 - 1
*/
public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {
- Preconditions.checkArgument(maxParallelism > 0 && maxParallelism <= (1 << 15),
- "maxParallelism is out of bounds 0 < maxParallelism <= 2^15. Found: " + maxParallelism);
+ Preconditions.checkArgument(maxParallelism > 0 &&
+ maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+ "maxParallelism is out of bounds 0 < maxParallelism <= " +
+ KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
+
config.setMaxParallelism(maxParallelism);
return this;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index a4a8dc7..2f80764 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -17,6 +17,18 @@
package org.apache.flink.streaming.api.graph;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
@@ -38,7 +50,6 @@ import org.apache.flink.streaming.api.operators.StoppableStreamSource;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -51,18 +62,6 @@ import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
/**
* Class representing the streaming topology. It contains all the information
* necessary to build the jobgraph for the execution.
@@ -358,18 +357,6 @@ public class StreamGraph extends StreamingPlan {
if (partitioner == null) {
partitioner = virtuaPartitionNodes.get(virtualId).f1;
}
-
- if (partitioner instanceof ConfigurableStreamPartitioner) {
- StreamNode downstreamNode = getStreamNode(downStreamVertexID);
-
- ConfigurableStreamPartitioner configurableStreamPartitioner = (ConfigurableStreamPartitioner) partitioner;
-
- // Configure the partitioner with the max parallelism. This is necessary if the
- // partitioner has been created before the maximum parallelism has been set. The
- // maximum parallelism is necessary for the key group mapping.
- configurableStreamPartitioner.configure(downstreamNode.getMaxParallelism());
- }
-
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
} else {
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 7ab7494..ddd0515 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -34,7 +34,6 @@ import org.apache.flink.streaming.api.transformations.SplitTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
-import org.apache.flink.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,6 +78,9 @@ public class StreamGraphGenerator {
private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class);
+ public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM;
+ public static final int UPPER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
+
// The StreamGraph that is being built, this is initialized at the beginning.
private StreamGraph streamGraph;
@@ -149,25 +151,11 @@ public class StreamGraphGenerator {
if (transform.getMaxParallelism() <= 0) {
// if the max parallelism hasn't been set, then first use the job wide max parallelism
- // from theExecutionConfig. If this value has not been specified either, then use the
- // parallelism of the operator.
- int maxParallelism = env.getConfig().getMaxParallelism();
-
- if (maxParallelism <= 0) {
-
- int parallelism = transform.getParallelism();
-
- if(parallelism <= 0) {
- parallelism = 1;
- transform.setParallelism(parallelism);
- }
-
- maxParallelism = Math.max(
- MathUtils.roundUpToPowerOfTwo(parallelism + (parallelism / 2)),
- KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM);
+ // from theExecutionConfig.
+ int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
+ if (globalMaxParallelismFromConfig > 0) {
+ transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
-
- transform.setMaxParallelism(maxParallelism);
}
// call at least once to trigger exceptions about MissingTypeInfo
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 0d58ed2..19a3699 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.util.ArrayList;
@@ -163,7 +162,6 @@ public class StreamNode implements Serializable {
* @param maxParallelism Maximum parallelism to be set
*/
void setMaxParallelism(int maxParallelism) {
- Preconditions.checkArgument(maxParallelism > 0, "The maximum parallelism must be at least 1.");
this.maxParallelism = maxParallelism;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index e306f30..8877c80 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -310,18 +310,7 @@ public class StreamingJobGraphGenerator {
parallelism = jobVertex.getParallelism();
}
- int maxParallelism = streamNode.getMaxParallelism();
-
- // the maximum parallelism specifies the upper bound for the parallelism
- if (parallelism > maxParallelism) {
- // the parallelism should always be smaller or equal than the max parallelism
- throw new IllegalStateException("The maximum parallelism (" + maxParallelism + ") of " +
- "the stream node " + streamNode + " is smaller than the parallelism (" +
- parallelism + "). Increase the maximum parallelism or decrease the parallelism of " +
- "this operator.");
- } else {
- jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
- }
+ jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
if (LOG.isDebugEnabled()) {
LOG.debug("Parallelism set: {} for {}", parallelism, streamNodeId);
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index f7aecdb..5e1b3e2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.util.Preconditions;
@@ -205,6 +206,10 @@ public abstract class StreamTransformation<T> {
* @param maxParallelism Maximum parallelism for this stream transformation.
*/
public void setMaxParallelism(int maxParallelism) {
+ Preconditions.checkArgument(maxParallelism > 0
+ && maxParallelism <= StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM,
+ "Maximum parallelism must be between 1 and " + StreamGraphGenerator.UPPER_BOUND_MAX_PARALLELISM
+ + ". Found: " + maxParallelism);
this.maxParallelism = maxParallelism;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
index 256fee1..ddbdaea 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
@@ -76,6 +76,7 @@ public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implem
@Override
public void configure(int maxParallelism) {
+ KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
this.maxParallelism = maxParallelism;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 7771064..6d01795 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -38,6 +38,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
+import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -94,6 +95,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
try {
for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge outEdge = outEdgesInOrder.get(i);
+
RecordWriterOutput<?> streamOutput = createStreamOutput(
outEdge, chainedConfigs.get(outEdge.getSourceId()), i,
containingTask.getEnvironment(), containingTask.getName());
@@ -330,6 +332,14 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
ResultPartitionWriter bufferWriter = taskEnvironment.getWriter(outputIndex);
+ // we initialize the partitioner here with the number of key groups (aka max. parallelism)
+ if (outputPartitioner instanceof ConfigurableStreamPartitioner) {
+ int numKeyGroups = bufferWriter.getNumTargetKeyGroups();
+ if (0 < numKeyGroups) {
+ ((ConfigurableStreamPartitioner) outputPartitioner).configure(numKeyGroups);
+ }
+ }
+
StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output =
new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());