You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 09:47:12 UTC
[flink] 01/02: [FLINK-10396] Remove CodebaseType
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit fd9b5de41eb9935c7ff1350f3b6e5d818104c677
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sat Sep 22 19:54:07 2018 +0200
[FLINK-10396] Remove CodebaseType
CodebaseType was used to distinguish between the legacy and new mode.
This commit removes the CodebaseType and the codebase switch in the
MiniClusterResource.
This closes #6748.
---
.../runtime/webmonitor/WebFrontendITCase.java | 36 +++-------
.../webmonitor/handlers/JarRunHandlerTest.java | 2 -
.../webmonitor/history/HistoryServerTest.java | 2 -
.../apache/flink/api/scala/ScalaShellITCase.scala | 44 +++++--------
.../api/scala/ScalaShellLocalStartupITCase.scala | 9 +--
.../flink/test/util/MiniClusterResource.java | 76 +---------------------
.../util/MiniClusterResourceConfiguration.java | 24 +------
.../org/apache/flink/test/util/TestBaseUtils.java | 22 -------
.../test/example/client/JobRetrievalITCase.java | 2 -
.../flink/test/misc/AutoParallelismITCase.java | 4 --
.../java/org/apache/flink/yarn/YarnTestBase.java | 9 +--
11 files changed, 28 insertions(+), 202 deletions(-)
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index b90277f..3ae830d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -152,11 +152,7 @@ public class WebFrontendITCase extends TestLogger {
if (notFoundJobConnection.getResponseCode() >= 400) {
// we don't set the content-encoding header
Assert.assertNull(notFoundJobConnection.getContentEncoding());
- if (CLUSTER.getCodebaseType() == TestBaseUtils.CodebaseType.NEW) {
- Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType());
- } else {
- Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType());
- }
+ Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType());
} else {
throw new RuntimeException("Request for non-existing job did not return an error.");
}
@@ -280,23 +276,13 @@ public class WebFrontendITCase extends TestLogger {
final Deadline deadline = testTimeout.fromNow();
try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
- if (CLUSTER.getCodebaseType() == TestBaseUtils.CodebaseType.NEW) {
- // stop the job
- client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft());
- HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
-
- assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
- assertEquals("application/json; charset=UTF-8", response.getType());
- assertEquals("{}", response.getContent());
- } else {
- // stop the job
- client.sendDeleteRequest("/jobs/" + jid + "/stop", deadline.timeLeft());
- HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
-
- assertEquals(HttpResponseStatus.OK, response.getStatus());
- assertEquals("application/json; charset=UTF-8", response.getType());
- assertEquals("{}", response.getContent());
- }
+ // stop the job
+ client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft());
+ HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
+
+ assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+ assertEquals("application/json; charset=UTF-8", response.getType());
+ assertEquals("{}", response.getContent());
}
// wait for cancellation to finish
@@ -355,11 +341,7 @@ public class WebFrontendITCase extends TestLogger {
HttpTestClient.SimpleHttpResponse response = client
.getNextResponse(deadline.timeLeft());
- if (CLUSTER.getCodebaseType() == TestBaseUtils.CodebaseType.NEW) {
- assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
- } else {
- assertEquals(HttpResponseStatus.OK, response.getStatus());
- }
+ assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
assertEquals("application/json; charset=UTF-8", response.getType());
assertEquals("{}", response.getContent());
}
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
index 6427f4d..a2138e1 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.ExceptionUtils;
import org.junit.ClassRule;
@@ -67,7 +66,6 @@ public class JarRunHandlerTest {
.setConfiguration(config)
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
- .setCodebaseType(TestBaseUtils.CodebaseType.NEW)
.build());
clusterResource.before();
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 4451e6d..18dd76e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -81,7 +80,6 @@ public class HistoryServerTest extends TestLogger {
.setConfiguration(clusterConfig)
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
- .setCodebaseType(TestBaseUtils.CodebaseType.NEW)
.build());
cluster.before();
}
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index c6b43c2..6a41793 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -19,13 +19,11 @@
package org.apache.flink.api.scala
import java.io._
-import java.util.Objects
import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions, TaskManagerOptions}
import org.apache.flink.runtime.clusterframework.BootstrapTools
import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration, StandaloneMiniCluster}
-import org.apache.flink.test.util.{MiniClusterResource, TestBaseUtils}
-import org.apache.flink.test.util.TestBaseUtils.CodebaseType
+import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.util.TestLogger
import org.junit._
import org.junit.rules.TemporaryFolder
@@ -322,32 +320,20 @@ object ScalaShellITCase {
@BeforeClass
def beforeAll(): Unit = {
- val isNew = TestBaseUtils.isNewCodebase()
- if (isNew) {
- configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
- // set to different than default so not to interfere with ScalaShellLocalStartupITCase
- configuration.setInteger(RestOptions.PORT, 8082)
- val miniConfig = new MiniClusterConfiguration.Builder()
- .setConfiguration(configuration)
- .setNumSlotsPerTaskManager(parallelism)
- .build()
-
- val miniCluster = new MiniCluster(miniConfig)
- miniCluster.start()
- port = miniCluster.getRestAddress.getPort
- hostname = miniCluster.getRestAddress.getHost
-
- cluster = Some(Left(miniCluster))
- } else {
- configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
- configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
- val standaloneCluster = new StandaloneMiniCluster(configuration)
-
- hostname = standaloneCluster.getHostname
- port = standaloneCluster.getPort
-
- cluster = Some(Right(standaloneCluster))
- }
+ configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
+ // set to different than default so not to interfere with ScalaShellLocalStartupITCase
+ configuration.setInteger(RestOptions.PORT, 8082)
+ val miniConfig = new MiniClusterConfiguration.Builder()
+ .setConfiguration(configuration)
+ .setNumSlotsPerTaskManager(parallelism)
+ .build()
+
+ val miniCluster = new MiniCluster(miniConfig)
+ miniCluster.start()
+ port = miniCluster.getRestAddress.getPort
+ hostname = miniCluster.getRestAddress.getHost
+
+ cluster = Some(Left(miniCluster))
}
@AfterClass
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
index f13f57b..3952a0f 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
@@ -20,9 +20,8 @@ package org.apache.flink.api.scala
import java.io._
-import org.apache.flink.configuration.{Configuration, CoreOptions}
+import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.clusterframework.BootstrapTools
-import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.util.TestLogger
import org.junit.rules.TemporaryFolder
import org.junit.{Assert, Rule, Test}
@@ -86,12 +85,6 @@ class ScalaShellLocalStartupITCase extends TestLogger {
System.setOut(new PrintStream(baos))
val configuration = new Configuration()
- val mode = if (TestBaseUtils.isNewCodebase()) {
- CoreOptions.NEW_MODE
- } else {
- CoreOptions.LEGACY_MODE
- }
- configuration.setString(CoreOptions.MODE, mode)
val dir = temporaryFolder.newFolder()
BootstrapTools.writeConfiguration(configuration, new File(dir, "flink-conf.yaml"))
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 1835fc6..9140bb4 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -19,10 +19,7 @@
package org.apache.flink.test.util;
import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.DefaultActorSystemLoader;
import org.apache.flink.client.program.MiniClusterClient;
-import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
@@ -30,17 +27,12 @@ import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.minicluster.JobExecutorService;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
-import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
-import akka.actor.ActorSystem;
-import org.junit.Assume;
import org.junit.rules.ExternalResource;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
@@ -49,11 +41,6 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import scala.Option;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-
/**
* Starts a Flink mini cluster as a resource and registers the respective
* ExecutionEnvironment and StreamExecutionEnvironment.
@@ -66,8 +53,6 @@ public class MiniClusterResource extends ExternalResource {
private final MiniClusterResourceConfiguration miniClusterResourceConfiguration;
- private final TestBaseUtils.CodebaseType codebaseType;
-
private JobExecutorService jobExecutorService;
private ClusterClient<?> clusterClient;
@@ -82,11 +67,6 @@ public class MiniClusterResource extends ExternalResource {
public MiniClusterResource(final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
this.miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfiguration);
- this.codebaseType = miniClusterResourceConfiguration.getCodebaseType();
- }
-
- public TestBaseUtils.CodebaseType getCodebaseType() {
- return codebaseType;
}
public int getNumberSlots() {
@@ -111,12 +91,9 @@ public class MiniClusterResource extends ExternalResource {
@Override
public void before() throws Exception {
- // verify that we are running in the correct test profile
- Assume.assumeThat(TestBaseUtils.getCodebaseType(), is(equalTo(codebaseType)));
-
temporaryFolder.create();
- startJobExecutorService(codebaseType);
+ startMiniCluster();
numberSlots = miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() * miniClusterResourceConfiguration.getNumberTaskManagers();
@@ -163,57 +140,6 @@ public class MiniClusterResource extends ExternalResource {
}
}
- private void startJobExecutorService(TestBaseUtils.CodebaseType miniClusterType) throws Exception {
- switch (miniClusterType) {
- case LEGACY:
- startLegacyMiniCluster();
- break;
- case NEW:
- startMiniCluster();
- break;
- default:
- throw new FlinkRuntimeException("Unknown MiniClusterType " + miniClusterType + '.');
- }
- }
-
- private void startLegacyMiniCluster() throws Exception {
- final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration());
- configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, miniClusterResourceConfiguration.getNumberTaskManagers());
- configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, miniClusterResourceConfiguration.getNumberSlotsPerTaskManager());
- configuration.setString(CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath());
-
- final LocalFlinkMiniCluster flinkMiniCluster = TestBaseUtils.startCluster(
- configuration,
- miniClusterResourceConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED);
-
- jobExecutorService = flinkMiniCluster;
-
- switch (miniClusterResourceConfiguration.getRpcServiceSharing()) {
- case SHARED:
- Option<ActorSystem> actorSystemOption = flinkMiniCluster.firstActorSystem();
- Preconditions.checkState(actorSystemOption.isDefined());
-
- final ActorSystem actorSystem = actorSystemOption.get();
- clusterClient = new StandaloneClusterClient(
- configuration,
- flinkMiniCluster.highAvailabilityServices(),
- true,
- new DefaultActorSystemLoader(actorSystem));
- break;
- case DEDICATED:
- clusterClient = new StandaloneClusterClient(configuration, flinkMiniCluster.highAvailabilityServices(), true);
- break;
- }
-
- Configuration restClientConfig = new Configuration();
- restClientConfig.setInteger(JobManagerOptions.PORT, flinkMiniCluster.getLeaderRPCPort());
- this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig);
-
- if (flinkMiniCluster.webMonitor().isDefined()) {
- webUIPort = flinkMiniCluster.webMonitor().get().getServerPort();
- }
- }
-
private void startMiniCluster() throws Exception {
final Configuration configuration = miniClusterResourceConfiguration.getConfiguration();
configuration.setString(CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath());
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
index c938920..bd521a2 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
@@ -37,8 +37,6 @@ public class MiniClusterResourceConfiguration {
private final Time shutdownTimeout;
- private final TestBaseUtils.CodebaseType codebaseType;
-
private final RpcServiceSharing rpcServiceSharing;
MiniClusterResourceConfiguration(
@@ -46,13 +44,11 @@ public class MiniClusterResourceConfiguration {
int numberTaskManagers,
int numberSlotsPerTaskManager,
Time shutdownTimeout,
- TestBaseUtils.CodebaseType codebaseType,
RpcServiceSharing rpcServiceSharing) {
this.configuration = Preconditions.checkNotNull(configuration);
this.numberTaskManagers = numberTaskManagers;
this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
this.shutdownTimeout = Preconditions.checkNotNull(shutdownTimeout);
- this.codebaseType = Preconditions.checkNotNull(codebaseType);
this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing);
}
@@ -72,14 +68,6 @@ public class MiniClusterResourceConfiguration {
return shutdownTimeout;
}
- /**
- * @deprecated Will be irrelevant once the legacy mode has been removed.
- */
- @Deprecated
- public TestBaseUtils.CodebaseType getCodebaseType() {
- return codebaseType;
- }
-
public RpcServiceSharing getRpcServiceSharing() {
return rpcServiceSharing;
}
@@ -93,7 +81,6 @@ public class MiniClusterResourceConfiguration {
private int numberTaskManagers = 1;
private int numberSlotsPerTaskManager = 1;
private Time shutdownTimeout = AkkaUtils.getTimeoutAsTime(configuration);
- private TestBaseUtils.CodebaseType codebaseType = TestBaseUtils.getCodebaseType();
private RpcServiceSharing rpcServiceSharing = RpcServiceSharing.SHARED;
@@ -117,22 +104,13 @@ public class MiniClusterResourceConfiguration {
return this;
}
- /**
- * @deprecated Will be irrelevant once the legacy mode has been removed.
- */
- @Deprecated
- public Builder setCodebaseType(TestBaseUtils.CodebaseType codebaseType) {
- this.codebaseType = codebaseType;
- return this;
- }
-
public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing) {
this.rpcServiceSharing = rpcServiceSharing;
return this;
}
public MiniClusterResourceConfiguration build() {
- return new MiniClusterResourceConfiguration(configuration, numberTaskManagers, numberSlotsPerTaskManager, shutdownTimeout, codebaseType, rpcServiceSharing);
+ return new MiniClusterResourceConfiguration(configuration, numberTaskManagers, numberSlotsPerTaskManager, shutdownTimeout, rpcServiceSharing);
}
}
}
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index a0f52f2..363327a 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -44,8 +44,6 @@ import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nonnull;
-
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
@@ -678,26 +676,6 @@ public class TestBaseUtils extends TestLogger {
throw new TimeoutException("Could not get HTTP response in time since the service is still unavailable.");
}
- @Nonnull
- public static CodebaseType getCodebaseType() {
- return Objects.equals(NEW_CODEBASE, System.getProperty(CODEBASE_KEY)) ? CodebaseType.NEW : CodebaseType.LEGACY;
- }
-
- public static boolean isNewCodebase() {
- return CodebaseType.NEW == getCodebaseType();
- }
-
- /**
- * Type of the mini cluster to start.
- *
- * @deprecated Will be irrelevant once the legacy mode has been removed.
- */
- @Deprecated
- public enum CodebaseType {
- LEGACY,
- NEW
- }
-
/**
* Comparator for comparable Tuples.
* @param <T> tuple type
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index 66a894f..56df46e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
@@ -58,7 +57,6 @@ public class JobRetrievalITCase extends TestLogger {
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(4)
- .setCodebaseType(TestBaseUtils.CodebaseType.NEW)
.build());
private RestClusterClient<StandaloneClusterId> client;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index 4826a46..f62ccf7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -28,7 +28,6 @@ import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
@@ -80,9 +79,6 @@ public class AutoParallelismITCase extends TestLogger {
assertEquals(PARALLELISM, resultCollection.size());
}
catch (Exception ex) {
- if (MINI_CLUSTER_RESOURCE.getCodebaseType() == TestBaseUtils.CodebaseType.LEGACY) {
- throw ex;
- }
assertTrue(
ExceptionUtils.findThrowableWithMessage(ex, ExecutionGraphBuilder.PARALLELISM_AUTO_MAX_ERROR_MESSAGE).isPresent());
}
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 1a0520f..3763f65 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -21,7 +21,6 @@ package org.apache.flink.yarn;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.test.util.TestBaseUtils;
@@ -75,7 +74,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Scanner;
import java.util.Set;
import java.util.UUID;
@@ -153,7 +151,7 @@ public abstract class YarnTestBase extends TestLogger {
protected org.apache.flink.configuration.Configuration flinkConfiguration;
- protected boolean isNewMode;
+ protected final boolean isNewMode = true;
static {
YARN_CONFIGURATION = new YarnConfiguration();
@@ -198,8 +196,6 @@ public abstract class YarnTestBase extends TestLogger {
}
flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration);
-
- isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType());
}
/**
@@ -552,9 +548,6 @@ public abstract class YarnTestBase extends TestLogger {
FileUtils.copyDirectory(new File(confDirPath), tempConfPathForSecureRun);
- globalConfiguration.setString(CoreOptions.MODE,
- Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType()) ? CoreOptions.NEW_MODE : CoreOptions.LEGACY_MODE);
-
BootstrapTools.writeConfiguration(
globalConfiguration,
new File(tempConfPathForSecureRun, "flink-conf.yaml"));