You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/09/19 14:00:39 UTC
[flink] 02/02: [FLINK-13953][build,
runtime] Facilitate enabling new Scheduler in MiniCluster Tests
This is an automated email from the ASF dual-hosted git repository.
gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 34b5399f4effb679baabd8bca312cbf92ec34165
Author: zhuzhu.zz <zh...@alibaba-inc.com>
AuthorDate: Thu Sep 12 12:14:11 2019 +0800
[FLINK-13953][build, runtime] Facilitate enabling new Scheduler in MiniCluster Tests
This closes #9675.
---
.../minicluster/MiniClusterConfiguration.java | 18 ++++-
.../minicluster/MiniClusterConfigurationTest.java | 77 ++++++++++++++++++++++
.../junit/category/AlsoRunWithSchedulerNG.java | 25 +++++++
pom.xml | 16 +++++
4 files changed, 135 insertions(+), 1 deletion(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index a39bfd4..20eee1b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
import javax.annotation.Nullable;
@@ -38,6 +39,8 @@ import static org.apache.flink.runtime.minicluster.RpcServiceSharing.SHARED;
*/
public class MiniClusterConfiguration {
+ static final String SCHEDULER_TYPE_KEY = JobManagerOptions.SCHEDULER.key();
+
private final UnmodifiableConfiguration configuration;
private final int numTaskManagers;
@@ -57,12 +60,25 @@ public class MiniClusterConfiguration {
RpcServiceSharing rpcServiceSharing,
@Nullable String commonBindAddress) {
- this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration));
+ this.configuration = generateConfiguration(Preconditions.checkNotNull(configuration));
this.numTaskManagers = numTaskManagers;
this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing);
this.commonBindAddress = commonBindAddress;
}
+ private UnmodifiableConfiguration generateConfiguration(final Configuration configuration) {
+ String schedulerType = System.getProperty(SCHEDULER_TYPE_KEY);
+ if (StringUtils.isNullOrWhitespaceOnly(schedulerType)) {
+ schedulerType = JobManagerOptions.SCHEDULER.defaultValue();
+ }
+
+ if (!configuration.contains(JobManagerOptions.SCHEDULER)) {
+ configuration.setString(JobManagerOptions.SCHEDULER, schedulerType);
+ }
+
+ return new UnmodifiableConfiguration(configuration);
+ }
+
// ------------------------------------------------------------------------
// getters
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterConfigurationTest.java
new file mode 100644
index 0000000..a50e9e9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterConfigurationTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests for the {@link MiniClusterConfiguration}.
+ */
+public class MiniClusterConfigurationTest extends TestLogger {
+
+ private static final String TEST_SCHEDULER_NAME = "test-scheduler";
+
+ private static String priorSchedulerType;
+
+ @BeforeClass
+ public static void setUp() {
+ priorSchedulerType = System.getProperty(MiniClusterConfiguration.SCHEDULER_TYPE_KEY);
+ System.setProperty(MiniClusterConfiguration.SCHEDULER_TYPE_KEY, TEST_SCHEDULER_NAME);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (priorSchedulerType != null) {
+ System.setProperty(MiniClusterConfiguration.SCHEDULER_TYPE_KEY, priorSchedulerType);
+ priorSchedulerType = null;
+ } else {
+ System.clearProperty(MiniClusterConfiguration.SCHEDULER_TYPE_KEY);
+ }
+ }
+
+ @Test
+ public void testSchedulerType_setViaSystemProperty() {
+ final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder().build();
+
+ Assert.assertEquals(
+ TEST_SCHEDULER_NAME,
+ miniClusterConfiguration.getConfiguration().getString(JobManagerOptions.SCHEDULER));
+ }
+
+ @Test
+ public void testSchedulerType_notOverriddenIfExistingInConfig() {
+ final Configuration config = new Configuration();
+ config.setString(JobManagerOptions.SCHEDULER, JobManagerOptions.SCHEDULER.defaultValue());
+
+ final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+ .setConfiguration(config)
+ .build();
+
+ Assert.assertEquals(
+ JobManagerOptions.SCHEDULER.defaultValue(),
+ miniClusterConfiguration.getConfiguration().getString(JobManagerOptions.SCHEDULER));
+ }
+}
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/category/AlsoRunWithSchedulerNG.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/category/AlsoRunWithSchedulerNG.java
new file mode 100644
index 0000000..ca52ca1
--- /dev/null
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/junit/category/AlsoRunWithSchedulerNG.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.testutils.junit.category;
+
+/**
+ * Category marker interface to run tests with SchedulerNG.
+ */
+public interface AlsoRunWithSchedulerNG {
+}
diff --git a/pom.xml b/pom.xml
index 722eed2..5a8b598 100644
--- a/pom.xml
+++ b/pom.xml
@@ -150,6 +150,8 @@ under the License.
<hivemetastore.hadoop.version>2.7.5</hivemetastore.hadoop.version>
<jamicmp.referenceVersion>1.8.0</jamicmp.referenceVersion>
<japicmp.outputDir>tools/japicmp-output</japicmp.outputDir>
+ <test.scheduler.type></test.scheduler.type>
+ <test.groups></test.groups>
</properties>
<dependencies>
@@ -639,6 +641,18 @@ under the License.
</dependencyManagement>
<profiles>
+ <profile>
+ <id>scheduler-ng</id>
+ <properties>
+ <scheduler.type>ng</scheduler.type>
+ <test.groups>org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG</test.groups>
+ </properties>
+ <activation>
+ <property>
+ <name>scheduler-ng</name>
+ </property>
+ </activation>
+ </profile>
<profile>
<id>scala-2.11</id>
@@ -1426,11 +1440,13 @@ under the License.
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
<configuration>
+ <groups>${test.groups}</groups>
<forkCount>${flink.forkCount}</forkCount>
<reuseForks>${flink.reuseForks}</reuseForks>
<systemPropertyVariables>
<forkNumber>0${surefire.forkNumber}</forkNumber>
<log4j.configuration>${log4j.configuration}</log4j.configuration>
+ <jobmanager.scheduler>${test.scheduler.type}</jobmanager.scheduler>
</systemPropertyVariables>
<argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber} -XX:+UseG1GC</argLine>
</configuration>