You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/09/19 14:00:39 UTC

[flink] 02/02: [FLINK-13953][build, runtime] Facilitate enabling new Scheduler in MiniCluster Tests

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 34b5399f4effb679baabd8bca312cbf92ec34165
Author: zhuzhu.zz <zh...@alibaba-inc.com>
AuthorDate: Thu Sep 12 12:14:11 2019 +0800

    [FLINK-13953][build, runtime] Facilitate enabling new Scheduler in MiniCluster Tests
    
    This closes #9675.
---
 .../minicluster/MiniClusterConfiguration.java      | 18 ++++-
 .../minicluster/MiniClusterConfigurationTest.java  | 77 ++++++++++++++++++++++
 .../junit/category/AlsoRunWithSchedulerNG.java     | 25 +++++++
 pom.xml                                            | 16 +++++
 4 files changed, 135 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index a39bfd4..20eee1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
 
 import javax.annotation.Nullable;
 
@@ -38,6 +39,8 @@ import static org.apache.flink.runtime.minicluster.RpcServiceSharing.SHARED;
  */
 public class MiniClusterConfiguration {
 
+	static final String SCHEDULER_TYPE_KEY = JobManagerOptions.SCHEDULER.key();
+
 	private final UnmodifiableConfiguration configuration;
 
 	private final int numTaskManagers;
@@ -57,12 +60,25 @@ public class MiniClusterConfiguration {
 			RpcServiceSharing rpcServiceSharing,
 			@Nullable String commonBindAddress) {
 
-		this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
+		this.configuration = generateConfiguration(Preconditions.checkNotNull(configuration));
 		this.numTaskManagers = numTaskManagers;
 		this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing);
 		this.commonBindAddress = commonBindAddress;
 	}
 
+	private UnmodifiableConfiguration generateConfiguration(final Configuration configuration) {
+		String schedulerType = System.getProperty(SCHEDULER_TYPE_KEY);
+		if (StringUtils.isNullOrWhitespaceOnly(schedulerType)) {
+			schedulerType = JobManagerOptions.SCHEDULER.defaultValue();
+		}
+
+		if (!configuration.contains(JobManagerOptions.SCHEDULER)) {
+			configuration.setString(JobManagerOptions.SCHEDULER, schedulerType);
+		}
+
+		return new UnmodifiableConfiguration(configuration);
+	}
+
 	// ------------------------------------------------------------------------
 	//  getters
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterConfigurationTest.java
new file mode 100644
index 0000000..a50e9e9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterConfigurationTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests for the {@link MiniClusterConfiguration}.
+ */
+public class MiniClusterConfigurationTest extends TestLogger {
+
+	private static final String TEST_SCHEDULER_NAME = "test-scheduler";
+
+	private static String priorSchedulerType;
+
+	@BeforeClass
+	public static void setUp() {
+		priorSchedulerType = System.getProperty(MiniClusterConfiguration.SCHEDULER_TYPE_KEY);
+		System.setProperty(MiniClusterConfiguration.SCHEDULER_TYPE_KEY, TEST_SCHEDULER_NAME);
+	}
+
+	@AfterClass
+	public static void tearDown() {
+		if (priorSchedulerType != null) {
+			System.setProperty(MiniClusterConfiguration.SCHEDULER_TYPE_KEY, priorSchedulerType);
+			priorSchedulerType = null;
+		} else {
+			System.clearProperty(MiniClusterConfiguration.SCHEDULER_TYPE_KEY);
+		}
+	}
+
+	@Test
+	public void testSchedulerType_setViaSystemProperty() {
+		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder().build();
+
+		Assert.assertEquals(
+			TEST_SCHEDULER_NAME,
+			miniClusterConfiguration.getConfiguration().getString(JobManagerOptions.SCHEDULER));
+	}
+
+	@Test
+	public void testSchedulerType_notOverriddenIfExistingInConfig() {
+		final Configuration config = new Configuration();
+		config.setString(JobManagerOptions.SCHEDULER, JobManagerOptions.SCHEDULER.defaultValue());
+
+		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+			.setConfiguration(config)
+			.build();
+
+		Assert.assertEquals(
+			JobManagerOptions.SCHEDULER.defaultValue(),
+			miniClusterConfiguration.getConfiguration().getString(JobManagerOptions.SCHEDULER));
+	}
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/category/AlsoRunWithSchedulerNG.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/category/AlsoRunWithSchedulerNG.java
new file mode 100644
index 0000000..ca52ca1
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/category/AlsoRunWithSchedulerNG.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.category;
+
+/**
+ * Category marker interface to run tests with SchedulerNG.
+ */
+public interface AlsoRunWithSchedulerNG {
+}
diff --git a/pom.xml b/pom.xml
index 722eed2..5a8b598 100644
--- a/pom.xml
+++ b/pom.xml
@@ -150,6 +150,8 @@ under the License.
 		<hivemetastore.hadoop.version>2.7.5</hivemetastore.hadoop.version>
 		<jamicmp.referenceVersion>1.8.0</jamicmp.referenceVersion>
 		<japicmp.outputDir>tools/japicmp-output</japicmp.outputDir>
+		<test.scheduler.type></test.scheduler.type>
+		<test.groups></test.groups>
 	</properties>
 
 	<dependencies>
@@ -639,6 +641,18 @@ under the License.
 	</dependencyManagement>
 
 	<profiles>
+		<profile>
+			<id>scheduler-ng</id>
+			<properties>
+				<scheduler.type>ng</scheduler.type>
+				<test.groups>org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG</test.groups>
+			</properties>
+			<activation>
+				<property>
+					<name>scheduler-ng</name>
+				</property>
+			</activation>
+		</profile>
 
 		<profile>
 			<id>scala-2.11</id>
@@ -1426,11 +1440,13 @@ under the License.
 				<artifactId>maven-surefire-plugin</artifactId>
 				<version>2.22.1</version>
 				<configuration>
+					<groups>${test.groups}</groups>
 					<forkCount>${flink.forkCount}</forkCount>
 					<reuseForks>${flink.reuseForks}</reuseForks>
 					<systemPropertyVariables>
 						<forkNumber>0${surefire.forkNumber}</forkNumber>
 						<log4j.configuration>${log4j.configuration}</log4j.configuration>
+						<jobmanager.scheduler>${test.scheduler.type}</jobmanager.scheduler>
 					</systemPropertyVariables>
 					<argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber} -XX:+UseG1GC</argLine>
 				</configuration>