You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/17 22:04:00 UTC
[2/3] flink git commit: [FLINK-5808] Move max keygroup constants to
ExecutionConfig
[FLINK-5808] Move max keygroup constants to ExecutionConfig
We need to have them there if we want to properly test the arguments of
setMaxParallelism() in the ExecutionConfig itself.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4fbae36
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4fbae36
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4fbae36
Branch: refs/heads/master
Commit: e4fbae36207c563363eed39886c24eea51d7db01
Parents: 9cfae89
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 14:37:26 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Mar 17 21:34:54 2017 +0100
----------------------------------------------------------------------
.../apache/flink/api/common/ExecutionConfig.java | 9 +++++++++
.../runtime/executiongraph/ExecutionJobVertex.java | 6 +++---
.../runtime/executiongraph/ExecutionVertex.java | 4 ++--
.../runtime/state/KeyGroupRangeAssignment.java | 16 ++++------------
.../api/environment/StreamExecutionEnvironment.java | 5 ++---
.../streaming/api/graph/StreamGraphGenerator.java | 6 +++---
6 files changed, 23 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e4fbae36/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 26e6af1..9af9cff 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -83,6 +83,15 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
*/
public static final int PARALLELISM_UNKNOWN = -2;
+ /**
+ * The default lower bound for max parallelism if nothing was configured by the user. We have this so allow users
+ * some degree of scale-up in case they forgot to configure maximum parallelism explicitly.
+ */
+ public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7;
+
+ /** The (inclusive) upper bound for max parallelism */
+ public static final int UPPER_BOUND_MAX_PARALLELISM = 1 << 15;
+
private static final long DEFAULT_RESTART_DELAY = 10000L;
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e4fbae36/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 852d530..545315f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -225,13 +225,13 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
private void setMaxParallelismInternal(int maxParallelism) {
if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) {
- maxParallelism = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
+ maxParallelism = ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM;
}
Preconditions.checkArgument(maxParallelism > 0
- && maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+ && maxParallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
"Overriding max parallelism is not in valid bounds (1..%s), found: %s",
- KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, maxParallelism);
+ ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM, maxParallelism);
this.maxParallelism = maxParallelism;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e4fbae36/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 21af73a..9693b97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.Archiveable;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.JobException;
@@ -40,7 +41,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.JobManagerOptions;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.EvictingBoundedList;
@@ -663,7 +663,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
//TODO this case only exists for test, currently there has to be exactly one consumer in real jobs!
producedPartitions.add(ResultPartitionDeploymentDescriptor.from(
partition,
- KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+ ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
lazyScheduling));
} else {
Preconditions.checkState(1 == consumers.size(),
http://git-wip-us.apache.org/repos/asf/flink/blob/e4fbae36/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
index 62bf3f6..bf0611b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java
@@ -18,20 +18,12 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
public final class KeyGroupRangeAssignment {
- /**
- * The default lower bound for max parallelism if nothing was configured by the user. We have this so allow users
- * some degree of scale-up in case they forgot to configure maximum parallelism explicitly.
- */
- public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7;
-
- /** The (inclusive) upper bound for max parallelism */
- public static final int UPPER_BOUND_MAX_PARALLELISM = 1 << 15;
-
private KeyGroupRangeAssignment() {
throw new AssertionError();
}
@@ -130,13 +122,13 @@ public final class KeyGroupRangeAssignment {
return Math.min(
Math.max(
MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),
- DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
- UPPER_BOUND_MAX_PARALLELISM);
+ ExecutionConfig.DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
+ ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM);
}
public static void checkParallelismPreconditions(int parallelism) {
Preconditions.checkArgument(parallelism > 0
- && parallelism <= UPPER_BOUND_MAX_PARALLELISM,
+ && parallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
"Operator parallelism not within bounds: " + parallelism);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e4fbae36/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ac3eadb..f443597 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -49,7 +49,6 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -186,9 +185,9 @@ public abstract class StreamExecutionEnvironment {
*/
public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {
Preconditions.checkArgument(maxParallelism > 0 &&
- maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
+ maxParallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
"maxParallelism is out of bounds 0 < maxParallelism <= " +
- KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
+ ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
config.setMaxParallelism(maxParallelism);
return this;
http://git-wip-us.apache.org/repos/asf/flink/blob/e4fbae36/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index b3b6529..2defbef 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.api.graph;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
@@ -78,8 +78,8 @@ public class StreamGraphGenerator {
private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class);
- public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM;
- public static final int UPPER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
+ public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = ExecutionConfig.DEFAULT_LOWER_BOUND_MAX_PARALLELISM;
+ public static final int UPPER_BOUND_MAX_PARALLELISM = ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM;
// The StreamGraph that is being built, this is initialized at the beginning.
private final StreamGraph streamGraph;