You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/28 09:47:11 UTC

[flink] branch master updated (9f5fd07 -> cc334db)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 9f5fd07  [FLINK-10411] Introduce DispatcherResourceManagerComponentFactory
     new fd9b5de  [FLINK-10396] Remove CodebaseType
     new cc334db  [hotfix] Remove StandaloneMiniCluster from ScalaShellITCase

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/webmonitor/WebFrontendITCase.java      | 36 +++-------
 .../webmonitor/handlers/JarRunHandlerTest.java     |  2 -
 .../webmonitor/history/HistoryServerTest.java      |  2 -
 .../apache/flink/api/scala/ScalaShellITCase.scala  | 52 +++++----------
 .../api/scala/ScalaShellLocalStartupITCase.scala   |  9 +--
 .../flink/test/util/MiniClusterResource.java       | 76 +---------------------
 .../util/MiniClusterResourceConfiguration.java     | 24 +------
 .../org/apache/flink/test/util/TestBaseUtils.java  | 22 -------
 .../test/example/client/JobRetrievalITCase.java    |  2 -
 .../flink/test/misc/AutoParallelismITCase.java     |  4 --
 .../java/org/apache/flink/yarn/YarnTestBase.java   |  9 +--
 11 files changed, 31 insertions(+), 207 deletions(-)


[flink] 01/02: [FLINK-10396] Remove CodebaseType

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fd9b5de41eb9935c7ff1350f3b6e5d818104c677
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sat Sep 22 19:54:07 2018 +0200

    [FLINK-10396] Remove CodebaseType
    
    CodebaseType was used to distinguish between the legacy and new mode.
    This commit removes the CodebaseType and the codebase switch in the
    MiniClusterResource.
    
    This closes #6748.
---
 .../runtime/webmonitor/WebFrontendITCase.java      | 36 +++-------
 .../webmonitor/handlers/JarRunHandlerTest.java     |  2 -
 .../webmonitor/history/HistoryServerTest.java      |  2 -
 .../apache/flink/api/scala/ScalaShellITCase.scala  | 44 +++++--------
 .../api/scala/ScalaShellLocalStartupITCase.scala   |  9 +--
 .../flink/test/util/MiniClusterResource.java       | 76 +---------------------
 .../util/MiniClusterResourceConfiguration.java     | 24 +------
 .../org/apache/flink/test/util/TestBaseUtils.java  | 22 -------
 .../test/example/client/JobRetrievalITCase.java    |  2 -
 .../flink/test/misc/AutoParallelismITCase.java     |  4 --
 .../java/org/apache/flink/yarn/YarnTestBase.java   |  9 +--
 11 files changed, 28 insertions(+), 202 deletions(-)

diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index b90277f..3ae830d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -152,11 +152,7 @@ public class WebFrontendITCase extends TestLogger {
 		if (notFoundJobConnection.getResponseCode() >= 400) {
 			// we don't set the content-encoding header
 			Assert.assertNull(notFoundJobConnection.getContentEncoding());
-			if (CLUSTER.getCodebaseType() == TestBaseUtils.CodebaseType.NEW) {
-				Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType());
-			} else {
-				Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType());
-			}
+			Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType());
 		} else {
 			throw new RuntimeException("Request for non-existing job did not return an error.");
 		}
@@ -280,23 +276,13 @@ public class WebFrontendITCase extends TestLogger {
 		final Deadline deadline = testTimeout.fromNow();
 
 		try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
-			if (CLUSTER.getCodebaseType() == TestBaseUtils.CodebaseType.NEW) {
-				// stop the job
-				client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft());
-				HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
-
-				assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
-				assertEquals("application/json; charset=UTF-8", response.getType());
-				assertEquals("{}", response.getContent());
-			} else {
-				// stop the job
-				client.sendDeleteRequest("/jobs/" + jid + "/stop", deadline.timeLeft());
-				HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
-
-				assertEquals(HttpResponseStatus.OK, response.getStatus());
-				assertEquals("application/json; charset=UTF-8", response.getType());
-				assertEquals("{}", response.getContent());
-			}
+			// stop the job
+			client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft());
+			HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
+
+			assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+			assertEquals("application/json; charset=UTF-8", response.getType());
+			assertEquals("{}", response.getContent());
 		}
 
 		// wait for cancellation to finish
@@ -355,11 +341,7 @@ public class WebFrontendITCase extends TestLogger {
 			HttpTestClient.SimpleHttpResponse response = client
 				.getNextResponse(deadline.timeLeft());
 
-			if (CLUSTER.getCodebaseType() == TestBaseUtils.CodebaseType.NEW) {
-				assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
-			} else {
-				assertEquals(HttpResponseStatus.OK, response.getStatus());
-			}
+			assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
 			assertEquals("application/json; charset=UTF-8", response.getType());
 			assertEquals("{}", response.getContent());
 		}
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
index 6427f4d..a2138e1 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.ClassRule;
@@ -67,7 +66,6 @@ public class JarRunHandlerTest {
 				.setConfiguration(config)
 				.setNumberTaskManagers(1)
 				.setNumberSlotsPerTaskManager(1)
-				.setCodebaseType(TestBaseUtils.CodebaseType.NEW)
 				.build());
 		clusterResource.before();
 
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 4451e6d..18dd76e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -81,7 +80,6 @@ public class HistoryServerTest extends TestLogger {
 				.setConfiguration(clusterConfig)
 				.setNumberTaskManagers(1)
 				.setNumberSlotsPerTaskManager(1)
-				.setCodebaseType(TestBaseUtils.CodebaseType.NEW)
 				.build());
 		cluster.before();
 	}
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index c6b43c2..6a41793 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -19,13 +19,11 @@
 package org.apache.flink.api.scala
 
 import java.io._
-import java.util.Objects
 
 import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions, TaskManagerOptions}
 import org.apache.flink.runtime.clusterframework.BootstrapTools
 import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration, StandaloneMiniCluster}
-import org.apache.flink.test.util.{MiniClusterResource, TestBaseUtils}
-import org.apache.flink.test.util.TestBaseUtils.CodebaseType
+import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.util.TestLogger
 import org.junit._
 import org.junit.rules.TemporaryFolder
@@ -322,32 +320,20 @@ object ScalaShellITCase {
 
   @BeforeClass
   def beforeAll(): Unit = {
-    val isNew = TestBaseUtils.isNewCodebase()
-    if (isNew) {
-      configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
-      // set to different than default so not to interfere with ScalaShellLocalStartupITCase
-      configuration.setInteger(RestOptions.PORT, 8082)
-      val miniConfig = new MiniClusterConfiguration.Builder()
-        .setConfiguration(configuration)
-        .setNumSlotsPerTaskManager(parallelism)
-        .build()
-
-      val miniCluster = new MiniCluster(miniConfig)
-      miniCluster.start()
-      port = miniCluster.getRestAddress.getPort
-      hostname = miniCluster.getRestAddress.getHost
-
-      cluster = Some(Left(miniCluster))
-    } else {
-      configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
-      configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
-      val standaloneCluster = new StandaloneMiniCluster(configuration)
-
-      hostname = standaloneCluster.getHostname
-      port = standaloneCluster.getPort
-
-      cluster = Some(Right(standaloneCluster))
-    }
+    configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
+    // set to different than default so not to interfere with ScalaShellLocalStartupITCase
+    configuration.setInteger(RestOptions.PORT, 8082)
+    val miniConfig = new MiniClusterConfiguration.Builder()
+      .setConfiguration(configuration)
+      .setNumSlotsPerTaskManager(parallelism)
+      .build()
+
+    val miniCluster = new MiniCluster(miniConfig)
+    miniCluster.start()
+    port = miniCluster.getRestAddress.getPort
+    hostname = miniCluster.getRestAddress.getHost
+
+    cluster = Some(Left(miniCluster))
   }
 
   @AfterClass
diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
index f13f57b..3952a0f 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
@@ -20,9 +20,8 @@ package org.apache.flink.api.scala
 
 import java.io._
 
-import org.apache.flink.configuration.{Configuration, CoreOptions}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.clusterframework.BootstrapTools
-import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.util.TestLogger
 import org.junit.rules.TemporaryFolder
 import org.junit.{Assert, Rule, Test}
@@ -86,12 +85,6 @@ class ScalaShellLocalStartupITCase extends TestLogger {
     System.setOut(new PrintStream(baos))
 
     val configuration = new Configuration()
-    val mode = if (TestBaseUtils.isNewCodebase()) {
-      CoreOptions.NEW_MODE
-    } else {
-      CoreOptions.LEGACY_MODE
-    }
-    configuration.setString(CoreOptions.MODE, mode)
 
     val dir = temporaryFolder.newFolder()
     BootstrapTools.writeConfiguration(configuration, new File(dir, "flink-conf.yaml"))
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 1835fc6..9140bb4 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -19,10 +19,7 @@
 package org.apache.flink.test.util;
 
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.DefaultActorSystemLoader;
 import org.apache.flink.client.program.MiniClusterClient;
-import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -30,17 +27,12 @@ import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
-import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 
-import akka.actor.ActorSystem;
-import org.junit.Assume;
 import org.junit.rules.ExternalResource;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -49,11 +41,6 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-
 /**
  * Starts a Flink mini cluster as a resource and registers the respective
  * ExecutionEnvironment and StreamExecutionEnvironment.
@@ -66,8 +53,6 @@ public class MiniClusterResource extends ExternalResource {
 
 	private final MiniClusterResourceConfiguration miniClusterResourceConfiguration;
 
-	private final TestBaseUtils.CodebaseType codebaseType;
-
 	private JobExecutorService jobExecutorService;
 
 	private ClusterClient<?> clusterClient;
@@ -82,11 +67,6 @@ public class MiniClusterResource extends ExternalResource {
 
 	public MiniClusterResource(final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
 		this.miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfiguration);
-		this.codebaseType = miniClusterResourceConfiguration.getCodebaseType();
-	}
-
-	public TestBaseUtils.CodebaseType getCodebaseType() {
-		return codebaseType;
 	}
 
 	public int getNumberSlots() {
@@ -111,12 +91,9 @@ public class MiniClusterResource extends ExternalResource {
 
 	@Override
 	public void before() throws Exception {
-		// verify that we are running in the correct test profile
-		Assume.assumeThat(TestBaseUtils.getCodebaseType(), is(equalTo(codebaseType)));
-
 		temporaryFolder.create();
 
-		startJobExecutorService(codebaseType);
+		startMiniCluster();
 
 		numberSlots = miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() * miniClusterResourceConfiguration.getNumberTaskManagers();
 
@@ -163,57 +140,6 @@ public class MiniClusterResource extends ExternalResource {
 		}
 	}
 
-	private void startJobExecutorService(TestBaseUtils.CodebaseType miniClusterType) throws Exception {
-		switch (miniClusterType) {
-			case LEGACY:
-				startLegacyMiniCluster();
-				break;
-			case NEW:
-				startMiniCluster();
-				break;
-			default:
-				throw new FlinkRuntimeException("Unknown MiniClusterType " + miniClusterType + '.');
-		}
-	}
-
-	private void startLegacyMiniCluster() throws Exception {
-		final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration());
-		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, miniClusterResourceConfiguration.getNumberTaskManagers());
-		configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, miniClusterResourceConfiguration.getNumberSlotsPerTaskManager());
-		configuration.setString(CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath());
-
-		final LocalFlinkMiniCluster flinkMiniCluster = TestBaseUtils.startCluster(
-			configuration,
-			miniClusterResourceConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED);
-
-		jobExecutorService = flinkMiniCluster;
-
-		switch (miniClusterResourceConfiguration.getRpcServiceSharing()) {
-			case SHARED:
-				Option<ActorSystem> actorSystemOption = flinkMiniCluster.firstActorSystem();
-				Preconditions.checkState(actorSystemOption.isDefined());
-
-				final ActorSystem actorSystem = actorSystemOption.get();
-				clusterClient = new StandaloneClusterClient(
-					configuration,
-					flinkMiniCluster.highAvailabilityServices(),
-					true,
-					new DefaultActorSystemLoader(actorSystem));
-				break;
-			case DEDICATED:
-				clusterClient = new StandaloneClusterClient(configuration, flinkMiniCluster.highAvailabilityServices(), true);
-				break;
-		}
-
-		Configuration restClientConfig = new Configuration();
-		restClientConfig.setInteger(JobManagerOptions.PORT, flinkMiniCluster.getLeaderRPCPort());
-		this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig);
-
-		if (flinkMiniCluster.webMonitor().isDefined()) {
-			webUIPort = flinkMiniCluster.webMonitor().get().getServerPort();
-		}
-	}
-
 	private void startMiniCluster() throws Exception {
 		final Configuration configuration = miniClusterResourceConfiguration.getConfiguration();
 		configuration.setString(CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath());
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
index c938920..bd521a2 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
@@ -37,8 +37,6 @@ public class MiniClusterResourceConfiguration {
 
 	private final Time shutdownTimeout;
 
-	private final TestBaseUtils.CodebaseType codebaseType;
-
 	private final RpcServiceSharing rpcServiceSharing;
 
 	MiniClusterResourceConfiguration(
@@ -46,13 +44,11 @@ public class MiniClusterResourceConfiguration {
 		int numberTaskManagers,
 		int numberSlotsPerTaskManager,
 		Time shutdownTimeout,
-		TestBaseUtils.CodebaseType codebaseType,
 		RpcServiceSharing rpcServiceSharing) {
 		this.configuration = Preconditions.checkNotNull(configuration);
 		this.numberTaskManagers = numberTaskManagers;
 		this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
 		this.shutdownTimeout = Preconditions.checkNotNull(shutdownTimeout);
-		this.codebaseType = Preconditions.checkNotNull(codebaseType);
 		this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing);
 	}
 
@@ -72,14 +68,6 @@ public class MiniClusterResourceConfiguration {
 		return shutdownTimeout;
 	}
 
-	/**
-	 * @deprecated Will be irrelevant once the legacy mode has been removed.
-	 */
-	@Deprecated
-	public TestBaseUtils.CodebaseType getCodebaseType() {
-		return codebaseType;
-	}
-
 	public RpcServiceSharing getRpcServiceSharing() {
 		return rpcServiceSharing;
 	}
@@ -93,7 +81,6 @@ public class MiniClusterResourceConfiguration {
 		private int numberTaskManagers = 1;
 		private int numberSlotsPerTaskManager = 1;
 		private Time shutdownTimeout = AkkaUtils.getTimeoutAsTime(configuration);
-		private TestBaseUtils.CodebaseType codebaseType = TestBaseUtils.getCodebaseType();
 
 		private RpcServiceSharing rpcServiceSharing = RpcServiceSharing.SHARED;
 
@@ -117,22 +104,13 @@ public class MiniClusterResourceConfiguration {
 			return this;
 		}
 
-		/**
-		 * @deprecated Will be irrelevant once the legacy mode has been removed.
-		 */
-		@Deprecated
-		public Builder setCodebaseType(TestBaseUtils.CodebaseType codebaseType) {
-			this.codebaseType = codebaseType;
-			return this;
-		}
-
 		public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing) {
 			this.rpcServiceSharing = rpcServiceSharing;
 			return this;
 		}
 
 		public MiniClusterResourceConfiguration build() {
-			return new MiniClusterResourceConfiguration(configuration, numberTaskManagers, numberSlotsPerTaskManager, shutdownTimeout, codebaseType, rpcServiceSharing);
+			return new MiniClusterResourceConfiguration(configuration, numberTaskManagers, numberSlotsPerTaskManager, shutdownTimeout, rpcServiceSharing);
 		}
 	}
 }
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index a0f52f2..363327a 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -44,8 +44,6 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
-
 import java.io.BufferedInputStream;
 import java.io.BufferedReader;
 import java.io.File;
@@ -678,26 +676,6 @@ public class TestBaseUtils extends TestLogger {
 		throw new TimeoutException("Could not get HTTP response in time since the service is still unavailable.");
 	}
 
-	@Nonnull
-	public static CodebaseType getCodebaseType() {
-		return Objects.equals(NEW_CODEBASE, System.getProperty(CODEBASE_KEY)) ? CodebaseType.NEW : CodebaseType.LEGACY;
-	}
-
-	public static boolean isNewCodebase() {
-		return CodebaseType.NEW == getCodebaseType();
-	}
-
-	/**
-	 * Type of the mini cluster to start.
-	 *
-	 * @deprecated Will be irrelevant once the legacy mode has been removed.
-	 */
-	@Deprecated
-	public enum CodebaseType {
-		LEGACY,
-		NEW
-	}
-
 	/**
 	 * Comparator for comparable Tuples.
 	 * @param <T> tuple type
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index 66a894f..56df46e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -58,7 +57,6 @@ public class JobRetrievalITCase extends TestLogger {
 		new MiniClusterResourceConfiguration.Builder()
 			.setNumberTaskManagers(1)
 			.setNumberSlotsPerTaskManager(4)
-			.setCodebaseType(TestBaseUtils.CodebaseType.NEW)
 			.build());
 
 	private RestClusterClient<StandaloneClusterId> client;
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index 4826a46..f62ccf7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -28,7 +28,6 @@ import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
 import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
@@ -80,9 +79,6 @@ public class AutoParallelismITCase extends TestLogger {
 			assertEquals(PARALLELISM, resultCollection.size());
 		}
 		catch (Exception ex) {
-			if (MINI_CLUSTER_RESOURCE.getCodebaseType() == TestBaseUtils.CodebaseType.LEGACY) {
-				throw ex;
-			}
 			assertTrue(
 				ExceptionUtils.findThrowableWithMessage(ex, ExecutionGraphBuilder.PARALLELISM_AUTO_MAX_ERROR_MESSAGE).isPresent());
 		}
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 1a0520f..3763f65 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -21,7 +21,6 @@ package org.apache.flink.yarn;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.test.util.TestBaseUtils;
@@ -75,7 +74,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Scanner;
 import java.util.Set;
 import java.util.UUID;
@@ -153,7 +151,7 @@ public abstract class YarnTestBase extends TestLogger {
 
 	protected org.apache.flink.configuration.Configuration flinkConfiguration;
 
-	protected boolean isNewMode;
+	protected final boolean isNewMode = true;
 
 	static {
 		YARN_CONFIGURATION = new YarnConfiguration();
@@ -198,8 +196,6 @@ public abstract class YarnTestBase extends TestLogger {
 		}
 
 		flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration);
-
-		isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType());
 	}
 
 	/**
@@ -552,9 +548,6 @@ public abstract class YarnTestBase extends TestLogger {
 
 			FileUtils.copyDirectory(new File(confDirPath), tempConfPathForSecureRun);
 
-			globalConfiguration.setString(CoreOptions.MODE,
-				Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType()) ? CoreOptions.NEW_MODE : CoreOptions.LEGACY_MODE);
-
 			BootstrapTools.writeConfiguration(
 				globalConfiguration,
 				new File(tempConfPathForSecureRun, "flink-conf.yaml"));


[flink] 02/02: [hotfix] Remove StandaloneMiniCluster from ScalaShellITCase

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cc334db80face215250154047685466f7e01cb97
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Sep 27 16:13:57 2018 +0200

    [hotfix] Remove StandaloneMiniCluster from ScalaShellITCase
---
 .../scala/org/apache/flink/api/scala/ScalaShellITCase.scala  | 12 +++++-------
 1 file changed, 5 insertions(+), 7 deletions(-)

diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index 6a41793..54bb16f 100644
--- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -20,10 +20,9 @@ package org.apache.flink.api.scala
 
 import java.io._
 
-import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions, TaskManagerOptions}
+import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions}
 import org.apache.flink.runtime.clusterframework.BootstrapTools
-import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration, StandaloneMiniCluster}
-import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration}
 import org.apache.flink.util.TestLogger
 import org.junit._
 import org.junit.rules.TemporaryFolder
@@ -312,7 +311,7 @@ class ScalaShellITCase extends TestLogger {
 object ScalaShellITCase {
 
   val configuration = new Configuration()
-  var cluster: Option[Either[MiniCluster, StandaloneMiniCluster]] = None
+  var cluster: Option[MiniCluster] = None
 
   var port: Int = _
   var hostname : String = _
@@ -333,7 +332,7 @@ object ScalaShellITCase {
     port = miniCluster.getRestAddress.getPort
     hostname = miniCluster.getRestAddress.getHost
 
-    cluster = Some(Left(miniCluster))
+    cluster = Some(miniCluster)
   }
 
   @AfterClass
@@ -342,8 +341,7 @@ object ScalaShellITCase {
     Thread.currentThread().setContextClassLoader(classOf[ScalaShellITCase].getClassLoader)
 
     cluster.foreach {
-      case Left(miniCluster) => miniCluster.close()
-      case Right(miniCluster) => miniCluster.close()
+      miniCluster => miniCluster.close()
     }
   }