You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 09:47:12 UTC

[flink] 01/02: [FLINK-10396] Remove CodebaseType

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fd9b5de41eb9935c7ff1350f3b6e5d818104c677
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sat Sep 22 19:54:07 2018 +0200

    [FLINK-10396] Remove CodebaseType
    
    CodebaseType was used to distinguish between the legacy and new mode.
    This commit removes the CodebaseType and the codebase switch in the
    MiniClusterResource.
    
    This closes #6748.
---
 .../runtime/webmonitor/WebFrontendITCase.java      | 36 +++-------
 .../webmonitor/handlers/JarRunHandlerTest.java     |  2 -
 .../webmonitor/history/HistoryServerTest.java      |  2 -
 .../apache/flink/api/scala/ScalaShellITCase.scala  | 44 +++++--------
 .../api/scala/ScalaShellLocalStartupITCase.scala   |  9 +--
 .../flink/test/util/MiniClusterResource.java       | 76 +---------------------
 .../util/MiniClusterResourceConfiguration.java     | 24 +------
 .../org/apache/flink/test/util/TestBaseUtils.java  | 22 -------
 .../test/example/client/JobRetrievalITCase.java    |  2 -
 .../flink/test/misc/AutoParallelismITCase.java     |  4 --
 .../java/org/apache/flink/yarn/YarnTestBase.java   |  9 +--
 11 files changed, 28 insertions(+), 202 deletions(-)

diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index b90277f..3ae830d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -152,11 +152,7 @@ public class WebFrontendITCase extends TestLogger {
 		if (notFoundJobConnection.getResponseCode() >= 400) {
 			// we don't set the content-encoding header
 			Assert.assertNull(notFoundJobConnection.getContentEncoding());
-			if (CLUSTER.getCodebaseType() == TestBaseUtils.CodebaseType.NEW) {
-				Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType());
-			} else {
-				Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType());
-			}
+			Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType());
 		} else {
 			throw new RuntimeException("Request for non-existing job did not return an error.");
 		}
@@ -280,23 +276,13 @@ public class WebFrontendITCase extends TestLogger {
 		final Deadline deadline = testTimeout.fromNow();
 
 		try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
-			if (CLUSTER.getCodebaseType() == TestBaseUtils.CodebaseType.NEW) {
-				// stop the job
-				client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft());
-				HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
-
-				assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
-				assertEquals("application/json; charset=UTF-8", response.getType());
-				assertEquals("{}", response.getContent());
-			} else {
-				// stop the job
-				client.sendDeleteRequest("/jobs/" + jid + "/stop", deadline.timeLeft());
-				HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
-
-				assertEquals(HttpResponseStatus.OK, response.getStatus());
-				assertEquals("application/json; charset=UTF-8", response.getType());
-				assertEquals("{}", response.getContent());
-			}
+			// stop the job
+			client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft());
+			HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
+
+			assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+			assertEquals("application/json; charset=UTF-8", response.getType());
+			assertEquals("{}", response.getContent());
 		}
 
 		// wait for cancellation to finish
@@ -355,11 +341,7 @@ public class WebFrontendITCase extends TestLogger {
 			HttpTestClient.SimpleHttpResponse response = client
 				.getNextResponse(deadline.timeLeft());
 
-			if (CLUSTER.getCodebaseType() == TestBaseUtils.CodebaseType.NEW) {
-				assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
-			} else {
-				assertEquals(HttpResponseStatus.OK, response.getStatus());
-			}
+			assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
 			assertEquals("application/json; charset=UTF-8", response.getType());
 			assertEquals("{}", response.getContent());
 		}
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
index 6427f4d..a2138e1 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.ClassRule;
@@ -67,7 +66,6 @@ public class JarRunHandlerTest {
 				.setConfiguration(config)
 				.setNumberTaskManagers(1)
 				.setNumberSlotsPerTaskManager(1)
-				.setCodebaseType(TestBaseUtils.CodebaseType.NEW)
 				.build());
 		clusterResource.before();
 
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 4451e6d..18dd76e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -81,7 +80,6 @@ public class HistoryServerTest extends TestLogger {
 				.setConfiguration(clusterConfig)
 				.setNumberTaskManagers(1)
 				.setNumberSlotsPerTaskManager(1)
-				.setCodebaseType(TestBaseUtils.CodebaseType.NEW)
 				.build());
 		cluster.before();
 	}
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index c6b43c2..6a41793 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -19,13 +19,11 @@
 package org.apache.flink.api.scala
 
 import java.io._
-import java.util.Objects
 
 import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions, TaskManagerOptions}
 import org.apache.flink.runtime.clusterframework.BootstrapTools
 import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration, StandaloneMiniCluster}
-import org.apache.flink.test.util.{MiniClusterResource, TestBaseUtils}
-import org.apache.flink.test.util.TestBaseUtils.CodebaseType
+import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.util.TestLogger
 import org.junit._
 import org.junit.rules.TemporaryFolder
@@ -322,32 +320,20 @@ object ScalaShellITCase {
 
   @BeforeClass
   def beforeAll(): Unit = {
-    val isNew = TestBaseUtils.isNewCodebase()
-    if (isNew) {
-      configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
-      // set to different than default so not to interfere with ScalaShellLocalStartupITCase
-      configuration.setInteger(RestOptions.PORT, 8082)
-      val miniConfig = new MiniClusterConfiguration.Builder()
-        .setConfiguration(configuration)
-        .setNumSlotsPerTaskManager(parallelism)
-        .build()
-
-      val miniCluster = new MiniCluster(miniConfig)
-      miniCluster.start()
-      port = miniCluster.getRestAddress.getPort
-      hostname = miniCluster.getRestAddress.getHost
-
-      cluster = Some(Left(miniCluster))
-    } else {
-      configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
-      configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
-      val standaloneCluster = new StandaloneMiniCluster(configuration)
-
-      hostname = standaloneCluster.getHostname
-      port = standaloneCluster.getPort
-
-      cluster = Some(Right(standaloneCluster))
-    }
+    configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
+    // set to different than default so not to interfere with ScalaShellLocalStartupITCase
+    configuration.setInteger(RestOptions.PORT, 8082)
+    val miniConfig = new MiniClusterConfiguration.Builder()
+      .setConfiguration(configuration)
+      .setNumSlotsPerTaskManager(parallelism)
+      .build()
+
+    val miniCluster = new MiniCluster(miniConfig)
+    miniCluster.start()
+    port = miniCluster.getRestAddress.getPort
+    hostname = miniCluster.getRestAddress.getHost
+
+    cluster = Some(Left(miniCluster))
   }
 
   @AfterClass
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
index f13f57b..3952a0f 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
@@ -20,9 +20,8 @@ package org.apache.flink.api.scala
 
 import java.io._
 
-import org.apache.flink.configuration.{Configuration, CoreOptions}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.clusterframework.BootstrapTools
-import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.util.TestLogger
 import org.junit.rules.TemporaryFolder
 import org.junit.{Assert, Rule, Test}
@@ -86,12 +85,6 @@ class ScalaShellLocalStartupITCase extends TestLogger {
     System.setOut(new PrintStream(baos))
 
     val configuration = new Configuration()
-    val mode = if (TestBaseUtils.isNewCodebase()) {
-      CoreOptions.NEW_MODE
-    } else {
-      CoreOptions.LEGACY_MODE
-    }
-    configuration.setString(CoreOptions.MODE, mode)
 
     val dir = temporaryFolder.newFolder()
     BootstrapTools.writeConfiguration(configuration, new File(dir, "flink-conf.yaml"))
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 1835fc6..9140bb4 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -19,10 +19,7 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.DefaultActorSystemLoader;
 import org.apache.flink.client.program.MiniClusterClient;
-import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -30,17 +27,12 @@ import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
-import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
-import akka.actor.ActorSystem;
-import org.junit.Assume;
 import org.junit.rules.ExternalResource;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -49,11 +41,6 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-
 /**
  * Starts a Flink mini cluster as a resource and registers the respective
  * ExecutionEnvironment and StreamExecutionEnvironment.
@@ -66,8 +53,6 @@ public class MiniClusterResource extends ExternalResource {
 
 	private final MiniClusterResourceConfiguration miniClusterResourceConfiguration;
 
-	private final TestBaseUtils.CodebaseType codebaseType;
-
 	private JobExecutorService jobExecutorService;
 
 	private ClusterClient<?> clusterClient;
@@ -82,11 +67,6 @@ public class MiniClusterResource extends ExternalResource {
 
 	public MiniClusterResource(final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
 		this.miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfiguration);
-		this.codebaseType = miniClusterResourceConfiguration.getCodebaseType();
-	}
-
-	public TestBaseUtils.CodebaseType getCodebaseType() {
-		return codebaseType;
 	}
 
 	public int getNumberSlots() {
@@ -111,12 +91,9 @@ public class MiniClusterResource extends ExternalResource {
 
 	@Override
 	public void before() throws Exception {
-		// verify that we are running in the correct test profile
-		Assume.assumeThat(TestBaseUtils.getCodebaseType(), is(equalTo(codebaseType)));
-
 		temporaryFolder.create();
 
-		startJobExecutorService(codebaseType);
+		startMiniCluster();
 
 		numberSlots = miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() * miniClusterResourceConfiguration.getNumberTaskManagers();
 
@@ -163,57 +140,6 @@ public class MiniClusterResource extends ExternalResource {
 		}
 	}
 
-	private void startJobExecutorService(TestBaseUtils.CodebaseType miniClusterType) throws Exception {
-		switch (miniClusterType) {
-			case LEGACY:
-				startLegacyMiniCluster();
-				break;
-			case NEW:
-				startMiniCluster();
-				break;
-			default:
-				throw new FlinkRuntimeException("Unknown MiniClusterType " + miniClusterType + '.');
-		}
-	}
-
-	private void startLegacyMiniCluster() throws Exception {
-		final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration());
-		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, miniClusterResourceConfiguration.getNumberTaskManagers());
-		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, miniClusterResourceConfiguration.getNumberSlotsPerTaskManager());
-		configuration.setString(CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath());
-
-		final LocalFlinkMiniCluster flinkMiniCluster = TestBaseUtils.startCluster(
-			configuration,
-			miniClusterResourceConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED);
-
-		jobExecutorService = flinkMiniCluster;
-
-		switch (miniClusterResourceConfiguration.getRpcServiceSharing()) {
-			case SHARED:
-				Option<ActorSystem> actorSystemOption = flinkMiniCluster.firstActorSystem();
-				Preconditions.checkState(actorSystemOption.isDefined());
-
-				final ActorSystem actorSystem = actorSystemOption.get();
-				clusterClient = new StandaloneClusterClient(
-					configuration,
-					flinkMiniCluster.highAvailabilityServices(),
-					true,
-					new DefaultActorSystemLoader(actorSystem));
-				break;
-			case DEDICATED:
-				clusterClient = new StandaloneClusterClient(configuration, flinkMiniCluster.highAvailabilityServices(), true);
-				break;
-		}
-
-		Configuration restClientConfig = new Configuration();
-		restClientConfig.setInteger(JobManagerOptions.PORT, flinkMiniCluster.getLeaderRPCPort());
-		this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig);
-
-		if (flinkMiniCluster.webMonitor().isDefined()) {
-			webUIPort = flinkMiniCluster.webMonitor().get().getServerPort();
-		}
-	}
-
 	private void startMiniCluster() throws Exception {
 		final Configuration configuration = miniClusterResourceConfiguration.getConfiguration();
 		configuration.setString(CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath());
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
index c938920..bd521a2 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
@@ -37,8 +37,6 @@ public class MiniClusterResourceConfiguration {
 
 	private final Time shutdownTimeout;
 
-	private final TestBaseUtils.CodebaseType codebaseType;
-
 	private final RpcServiceSharing rpcServiceSharing;
 
 	MiniClusterResourceConfiguration(
@@ -46,13 +44,11 @@ public class MiniClusterResourceConfiguration {
 		int numberTaskManagers,
 		int numberSlotsPerTaskManager,
 		Time shutdownTimeout,
-		TestBaseUtils.CodebaseType codebaseType,
 		RpcServiceSharing rpcServiceSharing) {
 		this.configuration = Preconditions.checkNotNull(configuration);
 		this.numberTaskManagers = numberTaskManagers;
 		this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
 		this.shutdownTimeout = Preconditions.checkNotNull(shutdownTimeout);
-		this.codebaseType = Preconditions.checkNotNull(codebaseType);
 		this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing);
 	}
 
@@ -72,14 +68,6 @@ public class MiniClusterResourceConfiguration {
 		return shutdownTimeout;
 	}
 
-	/**
-	 * @deprecated Will be irrelevant once the legacy mode has been removed.
-	 */
-	@Deprecated
-	public TestBaseUtils.CodebaseType getCodebaseType() {
-		return codebaseType;
-	}
-
 	public RpcServiceSharing getRpcServiceSharing() {
 		return rpcServiceSharing;
 	}
@@ -93,7 +81,6 @@ public class MiniClusterResourceConfiguration {
 		private int numberTaskManagers = 1;
 		private int numberSlotsPerTaskManager = 1;
 		private Time shutdownTimeout = AkkaUtils.getTimeoutAsTime(configuration);
-		private TestBaseUtils.CodebaseType codebaseType = TestBaseUtils.getCodebaseType();
 
 		private RpcServiceSharing rpcServiceSharing = RpcServiceSharing.SHARED;
 
@@ -117,22 +104,13 @@ public class MiniClusterResourceConfiguration {
 			return this;
 		}
 
-		/**
-		 * @deprecated Will be irrelevant once the legacy mode has been removed.
-		 */
-		@Deprecated
-		public Builder setCodebaseType(TestBaseUtils.CodebaseType codebaseType) {
-			this.codebaseType = codebaseType;
-			return this;
-		}
-
 		public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing) {
 			this.rpcServiceSharing = rpcServiceSharing;
 			return this;
 		}
 
 		public MiniClusterResourceConfiguration build() {
-			return new MiniClusterResourceConfiguration(configuration, numberTaskManagers, numberSlotsPerTaskManager, shutdownTimeout, codebaseType, rpcServiceSharing);
+			return new MiniClusterResourceConfiguration(configuration, numberTaskManagers, numberSlotsPerTaskManager, shutdownTimeout, rpcServiceSharing);
 		}
 	}
 }
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index a0f52f2..363327a 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -44,8 +44,6 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
-
 import java.io.BufferedInputStream;
 import java.io.BufferedReader;
 import java.io.File;
@@ -678,26 +676,6 @@ public class TestBaseUtils extends TestLogger {
 		throw new TimeoutException("Could not get HTTP response in time since the service is still unavailable.");
 	}
 
-	@Nonnull
-	public static CodebaseType getCodebaseType() {
-		return Objects.equals(NEW_CODEBASE, System.getProperty(CODEBASE_KEY)) ? CodebaseType.NEW : CodebaseType.LEGACY;
-	}
-
-	public static boolean isNewCodebase() {
-		return CodebaseType.NEW == getCodebaseType();
-	}
-
-	/**
-	 * Type of the mini cluster to start.
-	 *
-	 * @deprecated Will be irrelevant once the legacy mode has been removed.
-	 */
-	@Deprecated
-	public enum CodebaseType {
-		LEGACY,
-		NEW
-	}
-
 	/**
 	 * Comparator for comparable Tuples.
 	 * @param <T> tuple type
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index 66a894f..56df46e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -58,7 +57,6 @@ public class JobRetrievalITCase extends TestLogger {
 		new MiniClusterResourceConfiguration.Builder()
 			.setNumberTaskManagers(1)
 			.setNumberSlotsPerTaskManager(4)
-			.setCodebaseType(TestBaseUtils.CodebaseType.NEW)
 			.build());
 
 	private RestClusterClient<StandaloneClusterId> client;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index 4826a46..f62ccf7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -28,7 +28,6 @@ import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
@@ -80,9 +79,6 @@ public class AutoParallelismITCase extends TestLogger {
 			assertEquals(PARALLELISM, resultCollection.size());
 		}
 		catch (Exception ex) {
-			if (MINI_CLUSTER_RESOURCE.getCodebaseType() == TestBaseUtils.CodebaseType.LEGACY) {
-				throw ex;
-			}
 			assertTrue(
 				ExceptionUtils.findThrowableWithMessage(ex, ExecutionGraphBuilder.PARALLELISM_AUTO_MAX_ERROR_MESSAGE).isPresent());
 		}
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 1a0520f..3763f65 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -21,7 +21,6 @@ package org.apache.flink.yarn;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.test.util.TestBaseUtils;
@@ -75,7 +74,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Scanner;
 import java.util.Set;
 import java.util.UUID;
@@ -153,7 +151,7 @@ public abstract class YarnTestBase extends TestLogger {
 
 	protected org.apache.flink.configuration.Configuration flinkConfiguration;
 
-	protected boolean isNewMode;
+	protected final boolean isNewMode = true;
 
 	static {
 		YARN_CONFIGURATION = new YarnConfiguration();
@@ -198,8 +196,6 @@ public abstract class YarnTestBase extends TestLogger {
 		}
 
 		flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration);
-
-		isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType());
 	}
 
 	/**
@@ -552,9 +548,6 @@ public abstract class YarnTestBase extends TestLogger {
 
 			FileUtils.copyDirectory(new File(confDirPath), tempConfPathForSecureRun);
 
-			globalConfiguration.setString(CoreOptions.MODE,
-				Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType()) ? CoreOptions.NEW_MODE : CoreOptions.LEGACY_MODE);
-
 			BootstrapTools.writeConfiguration(
 				globalConfiguration,
 				new File(tempConfPathForSecureRun, "flink-conf.yaml"));