You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/18 08:39:05 UTC

[3/3] flink git commit: [FLINK-5808] Add proper checks in setParallelism()/setMaxParallelism()

[FLINK-5808] Add proper checks in setParallelism()/setMaxParallelism()

Before, there where some checks in
StreamExecutionEnvironment.set(Max)Parallelism() but a user would
circumvent these if using the ExecutionConfig directly. Now, all checks
are moved to the ExecutionConfig.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99fb80be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99fb80be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99fb80be

Branch: refs/heads/release-1.2
Commit: 99fb80be773499907d379553010dd999214f64fb
Parents: d3b275f
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 14:35:37 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Mar 18 07:43:42 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       | 35 +++++++++----
 .../environment/StreamExecutionEnvironment.java |  8 ---
 .../StreamExecutionEnvironmentTest.java         | 54 ++++++++++++++++++++
 .../api/graph/StreamGraphGeneratorTest.java     |  3 ++
 ...tractEventTimeWindowCheckpointingITCase.java |  2 +-
 5 files changed, 84 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 14245ed..8d5fc90 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -84,8 +84,9 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	public static final int PARALLELISM_UNKNOWN = -2;
 
 	/**
-	 * The default lower bound for max parallelism if nothing was configured by the user. We have this so allow users
-	 * some degree of scale-up in case they forgot to configure maximum parallelism explicitly.
+	 * The default lower bound for max parallelism if nothing was configured by the user. We have
+	 * this to allow users some degree of scale-up in case they forgot to configure maximum
+	 * parallelism explicitly.
 	 */
 	public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 1 << 7;
 
@@ -289,13 +290,18 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	 * @param parallelism The parallelism to use
 	 */
 	public ExecutionConfig setParallelism(int parallelism) {
-		if (parallelism != PARALLELISM_UNKNOWN) {
-			if (parallelism < 1 && parallelism != PARALLELISM_DEFAULT) {
-				throw new IllegalArgumentException(
-					"Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
-			}
-			this.parallelism = parallelism;
-		}
+		checkArgument(parallelism != PARALLELISM_UNKNOWN, "Cannot specify UNKNOWN_PARALLELISM.");
+		checkArgument(
+				parallelism >= 1 || parallelism == PARALLELISM_DEFAULT,
+				"Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT " +
+						"(use system default).");
+		checkArgument(
+				maxParallelism == -1 || parallelism <= maxParallelism,
+				"The specified parallelism must be smaller or equal to the maximum parallelism.");
+		checkArgument(
+				maxParallelism == -1 || parallelism != PARALLELISM_DEFAULT,
+				"Default parallelism cannot be specified when maximum parallelism is specified");
+		this.parallelism = parallelism;
 		return this;
 	}
 
@@ -322,7 +328,18 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	 */
 	@PublicEvolving
 	public void setMaxParallelism(int maxParallelism) {
+		checkArgument(
+				parallelism != PARALLELISM_DEFAULT,
+				"A maximum parallelism can only be specified with an explicitly specified " +
+						"parallelism.");
 		checkArgument(maxParallelism > 0, "The maximum parallelism must be greater than 0.");
+		checkArgument(
+				maxParallelism >= parallelism,
+				"The maximum parallelism must be larger than the parallelism.");
+		checkArgument(
+				maxParallelism > 0 && maxParallelism <= UPPER_BOUND_MAX_PARALLELISM,
+				"maxParallelism is out of bounds 0 < maxParallelism <= " +
+						UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
 		this.maxParallelism = maxParallelism;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 640915c..b16298c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -167,9 +167,6 @@ public abstract class StreamExecutionEnvironment {
 	 * @param parallelism The parallelism
 	 */
 	public StreamExecutionEnvironment setParallelism(int parallelism) {
-		if (parallelism < 1) {
-			throw new IllegalArgumentException("parallelism must be at least one.");
-		}
 		config.setParallelism(parallelism);
 		return this;
 	}
@@ -183,11 +180,6 @@ public abstract class StreamExecutionEnvironment {
 	 * @param maxParallelism Maximum degree of parallelism to be used for the program., with 0 < maxParallelism <= 2^15 - 1
 	 */
 	public StreamExecutionEnvironment setMaxParallelism(int maxParallelism) {
-		Preconditions.checkArgument(maxParallelism > 0 &&
-						maxParallelism <= ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM,
-				"maxParallelism is out of bounds 0 < maxParallelism <= " +
-						ExecutionConfig.UPPER_BOUND_MAX_PARALLELISM + ". Found: " + maxParallelism);
-
 		config.setMaxParallelism(maxParallelism);
 		return this;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
index d29c833..fd27179 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
@@ -36,7 +36,9 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.SplittableIterator;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.net.URL;
 import java.util.Arrays;
@@ -52,6 +54,10 @@ import static org.mockito.Mockito.mock;
 
 public class StreamExecutionEnvironmentTest {
 
+	@Rule
+	public final ExpectedException exception = ExpectedException.none();
+
+
 	@Test
 	public void fromElementsWithBaseTypeTest1() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -156,6 +162,53 @@ public class StreamExecutionEnvironmentTest {
 	}
 
 	@Test
+	public void testMaxParallelismMustBeBiggerEqualParallelism() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		env.setParallelism(10);
+
+		exception.expect(IllegalArgumentException.class);
+		env.setMaxParallelism(5);
+	}
+
+	@Test
+	public void testParallelismMustBeSmallerEqualMaxParallelism() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		env.setParallelism(10);
+		env.setMaxParallelism(20);
+
+		exception.expect(IllegalArgumentException.class);
+		env.setParallelism(30);
+	}
+
+	@Test
+	public void testSetDefaultParallelismNotAllowedWhenMaxParallelismSpecified() {
+		final int defaultParallelism = 20;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism);
+
+		env.setParallelism(10);
+		env.setMaxParallelism(15);
+
+		exception.expect(IllegalArgumentException.class);
+		env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
+	}
+
+	@Test
+	public void testSetMaxParallelismNotAllowedWithDefaultParallelism() {
+		final int defaultParallelism = 20;
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(defaultParallelism);
+
+		env.setParallelism(10);
+		env.setMaxParallelism(15);
+
+		exception.expect(IllegalArgumentException.class);
+		env.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
+	}
+
+	@Test
 	public void testParallelismBounds() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
@@ -206,6 +259,7 @@ public class StreamExecutionEnvironmentTest {
 		Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism());
 
 		// configured value after generating
+		env.setParallelism(21);
 		env.setMaxParallelism(42);
 		env.getStreamGraph().getJobGraph();
 		Assert.assertEquals(42, operator.getTransformation().getMaxParallelism());

http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 5fdacd4..fbbb5d2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -247,6 +247,7 @@ public class StreamGraphGeneratorTest {
 	public void testSetupOfKeyGroupPartitioner() {
 		int maxParallelism = 42;
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().setParallelism(12);
 		env.getConfig().setMaxParallelism(maxParallelism);
 
 		DataStream<Integer> source = env.fromElements(1, 2, 3);
@@ -278,6 +279,7 @@ public class StreamGraphGeneratorTest {
 		int keyedResult2MaxParallelism = 17;
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().setParallelism(12);
 		env.getConfig().setMaxParallelism(globalMaxParallelism);
 
 		DataStream<Integer> source = env.fromElements(1, 2, 3);
@@ -384,6 +386,7 @@ public class StreamGraphGeneratorTest {
 		DataStream<Integer> input1 = env.fromElements(1, 2, 3, 4).setMaxParallelism(128);
 		DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4).setMaxParallelism(129);
 
+		env.setParallelism(12);
 		env.getConfig().setMaxParallelism(maxParallelism);
 
 		DataStream<Integer> keyedResult = input1.connect(input2).keyBy(

http://git-wip-us.apache.org/repos/asf/flink/blob/99fb80be/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 1911f44..ee417ac 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -290,8 +290,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
 					"localhost", cluster.getLeaderRPCPort());
 
-			env.setMaxParallelism(2 * PARALLELISM);
 			env.setParallelism(PARALLELISM);
+			env.setMaxParallelism(2 * PARALLELISM);
 			env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 			env.enableCheckpointing(100);
 			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));