You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/18 08:39:05 UTC
[3/3] flink git commit: [FLINK-5808] Add proper checks in
setParallelism()/setMaxParallelism()
[FLINK-5808] Add proper checks in setParallelism()/setMaxParallelism()
Before, there where some checks in
StreamExecutionEnvironment.set(Max)Parallelism() but a user would
circumvent these if using the ExecutionConfig directly. Now, all checks
are moved to the ExecutionConfig.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99fb80be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99fb80be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99fb80be
Branch: refs/heads/release-1.2
Commit: 99fb80be773499907d379553010dd999214f64fb
Parents: d3b275f
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 14:35:37 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Mar 18 07:43:42 2017 +0100
----------------------------------------------------------------------
.../flink/api/common/ExecutionConfig.java | 35 +++++++++----
.../environment/StreamExecutionEnvironment.java | 8 ---
.../StreamExecutionEnvironmentTest.java | 54 ++++++++++++++++++++
.../api/graph/StreamGraphGeneratorTest.java | 3 ++
...tractEventTimeWindowCheckpointingITCase.java | 2 +-
5 files changed, 84 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 14245ed..8d5fc90 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -84,8 +84,9 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
public static final int PARALLELISM_UNKNOWN = -2;
/**
- * The default lower bound for max parallelism if nothing was configured by the user. We have this so allow users
- * some degree of scale-up in case they forgot to configure maximum parallelism explicitly.
+ * The default lower bound for max parallelism if nothing was configured by the user. We have
+ * this to allow users some degree of scale-up in case they forgot to configure maximum
+ * parallelism explicitly.
*/
public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7;
@@ -289,13 +290,18 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
* @param parallelism The parallelism to use
*/
public ExecutionConfig setParallelism(int parallelism) {
- if (parallelism != PARALLELISM_UNKNOWN) {
- if (parallelism < 1 && parallelism != PARALLELISM_DEFAULT) {
- throw new IllegalArgumentException(
- "Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
- }
- this.parallelism = parallelism;
- }
+ checkArgument(parallelism != PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM.");
+ checkArgument(
+ parallelism >= 1 || parallelism == PARALLELISM_DEFAULT,
+ "Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT " +
+ "(use system default).");
+ checkArgument(
+ maxParallelism == -1 || parallelism <= maxParallelism,
+ "The specified parallelism must be smaller or equal to the maximum parallelism.");
+ checkArgument(
+ maxParallelism == -1 || parallelism != PARALLELISM_DEFAULT,
+ "Default parallelism cannot be specified when maximum parallelism is specified");
+ this.parallelism = parallelism;
return this;
}
@@ -322,7 +328,18 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
*/
@PublicEvolving
public void setMaxParallelism(int maxParallelism) {
+ checkArgument(
+ parallelism != PARALLELISM_DEFAULT,
+ "A maximum parallelism can only be specified with an explicitly specified " +
+ "parallelism.");
checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0.");
+ checkArgument(
+ maxParallelism >= parallelism,
+ "The maximum parallelism must be larger than the parallelism.");
+ checkArgument(
+ maxParallelism > 0 && maxParallelism <= UPPER_BOUND_MAX_PARALLELISM,
+ "maxParallelism is out of bounds 0 < maxParallelism <= " +
+ UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
this.maxParallelism = maxParallelism;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 640915c..b16298c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -167,9 +167,6 @@ public abstract class StreamExecutionEnvironment {
* @param parallelism The parallelism
*/
public StreamExecutionEnvironment setParallelism(int parallelism) {
- if (parallelism < 1) {
- throw new IllegalArgumentException("parallelism must be at least one.");
- }
config.setParallelism(parallelism);
return this;
}
@@ -183,11 +180,6 @@ public abstract class StreamExecutionEnvironment {
* @param maxParallelism Maximum degree of parallelism to be used for the program., with 0 < maxParallelism <= 2^15 - 1
*/
public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {
- Preconditions.checkArgument(maxParallelism > 0 &&
- maxParallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
- "maxParallelism is out of bounds 0 < maxParallelism <= " +
- ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
-
config.setMaxParallelism(maxParallelism);
return this;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
index d29c833..fd27179 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
@@ -36,7 +36,9 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.SplittableIterator;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.net.URL;
import java.util.Arrays;
@@ -52,6 +54,10 @@ import static org.mockito.Mockito.mock;
public class StreamExecutionEnvironmentTest {
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+
@Test
public void fromElementsWithBaseTypeTest1() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -156,6 +162,53 @@ public class StreamExecutionEnvironmentTest {
}
@Test
+ public void testMaxParallelismMustBeBiggerEqualParallelism() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ env.setParallelism(10);
+
+ exception.expect(IllegalArgumentException.class);
+ env.setMaxParallelism(5);
+ }
+
+ @Test
+ public void testParallelismMustBeSmallerEqualMaxParallelism() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ env.setParallelism(10);
+ env.setMaxParallelism(20);
+
+ exception.expect(IllegalArgumentException.class);
+ env.setParallelism(30);
+ }
+
+ @Test
+ public void testSetDefaultParallelismNotAllowedWhenMaxParallelismSpecified() {
+ final int defaultParallelism = 20;
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism);
+
+ env.setParallelism(10);
+ env.setMaxParallelism(15);
+
+ exception.expect(IllegalArgumentException.class);
+ env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
+ }
+
+ @Test
+ public void testSetMaxParallelismNotAllowedWithDefaultParallelism() {
+ final int defaultParallelism = 20;
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism);
+
+ env.setParallelism(10);
+ env.setMaxParallelism(15);
+
+ exception.expect(IllegalArgumentException.class);
+ env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
+ }
+
+ @Test
public void testParallelismBounds() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -206,6 +259,7 @@ public class StreamExecutionEnvironmentTest {
Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
// configured value after generating
+ env.setParallelism(21);
env.setMaxParallelism(42);
env.getStreamGraph().getJobGraph();
Assert.assertEquals(42, operator.getTransformation().getMaxParallelism());
http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 5fdacd4..fbbb5d2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -247,6 +247,7 @@ public class StreamGraphGeneratorTest {
public void testSetupOfKeyGroupPartitioner() {
int maxParallelism = 42;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setParallelism(12);
env.getConfig().setMaxParallelism(maxParallelism);
DataStream<Integer> source = env.fromElements(1, 2, 3);
@@ -278,6 +279,7 @@ public class StreamGraphGeneratorTest {
int keyedResult2MaxParallelism = 17;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setParallelism(12);
env.getConfig().setMaxParallelism(globalMaxParallelism);
DataStream<Integer> source = env.fromElements(1, 2, 3);
@@ -384,6 +386,7 @@ public class StreamGraphGeneratorTest {
DataStream<Integer> input1 = env.fromElements(1, 2, 3, 4).setMaxParallelism(128);
DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4).setMaxParallelism(129);
+ env.setParallelism(12);
env.getConfig().setMaxParallelism(maxParallelism);
DataStream<Integer> keyedResult = input1.connect(input2).keyBy(
http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 1911f44..ee417ac 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -290,8 +290,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
"localhost", cluster.getLeaderRPCPort());
- env.setMaxParallelism(2 * PARALLELISM);
env.setParallelism(PARALLELISM);
+ env.setMaxParallelism(2 * PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));