You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/19 08:00:28 UTC

[8/9] flink git commit: [FLINK-8703][tests] Port WebFrontendITCase to MiniClusterResource

[FLINK-8703][tests] Port WebFrontendITCase to MiniClusterResource

This closes #5665.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/420190d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/420190d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/420190d8

Branch: refs/heads/master
Commit: 420190d85cdab414fa24db9c161852fcbcb81451
Parents: 31f4036
Author: zentol <ch...@apache.org>
Authored: Wed Mar 7 11:14:46 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 09:58:15 2018 +0200

----------------------------------------------------------------------
 .../runtime/webmonitor/WebFrontendITCase.java   | 202 +++++++++++++------
 .../webmonitor/testutils/HttpTestClient.java    |  19 ++
 .../flink/test/util/MiniClusterResource.java    |   4 +-
 3 files changed, 166 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/420190d8/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index 14602e3..f512766 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -19,15 +19,19 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.testutils.StoppableInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
 import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -38,8 +42,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.io.File;
@@ -47,8 +52,13 @@ import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.file.Files;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
@@ -66,40 +76,49 @@ public class WebFrontendITCase extends TestLogger {
 	private static final int NUM_TASK_MANAGERS = 2;
 	private static final int NUM_SLOTS = 4;
 
-	private static LocalFlinkMiniCluster cluster;
+	private static final Configuration CLUSTER_CONFIGURATION = getClusterConfiguration();
 
-	private static int port = -1;
+	@ClassRule
+	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
+		new MiniClusterResource.MiniClusterResourceConfiguration(
+			CLUSTER_CONFIGURATION,
+			NUM_TASK_MANAGERS,
+			NUM_SLOTS),
+		true
+	);
 
-	@BeforeClass
-	public static void initialize() throws Exception {
+	private static Configuration getClusterConfiguration() {
 		Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS);
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
-		config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
-
-		File logDir = File.createTempFile("TestBaseUtils-logdir", null);
-		assertTrue("Unable to delete temp file", logDir.delete());
-		assertTrue("Unable to create temp directory", logDir.mkdir());
-		File logFile = new File(logDir, "jobmanager.log");
-		File outFile = new File(logDir, "jobmanager.out");
+		try {
+			File logDir = File.createTempFile("TestBaseUtils-logdir", null);
+			assertTrue("Unable to delete temp file", logDir.delete());
+			assertTrue("Unable to create temp directory", logDir.mkdir());
+			File logFile = new File(logDir, "jobmanager.log");
+			File outFile = new File(logDir, "jobmanager.out");
 
-		Files.createFile(logFile.toPath());
-		Files.createFile(outFile.toPath());
+			Files.createFile(logFile.toPath());
+			Files.createFile(outFile.toPath());
 
-		config.setString(WebOptions.LOG_PATH, logFile.getAbsolutePath());
-		config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
+			config.setString(WebOptions.LOG_PATH, logFile.getAbsolutePath());
+			config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
+		} catch (Exception e) {
+			throw new AssertionError("Could not setup test.", e);
+		}
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
+		config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
-		cluster = new LocalFlinkMiniCluster(config, false);
-		cluster.start();
+		return config;
+	}
 
-		port = cluster.webMonitor().get().getServerPort();
+	@After
+	public void tearDown() {
+		BlockingInvokable.reset();
 	}
 
 	@Test
 	public void getFrontPage() {
 		try {
-			String fromHTTP = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/index.html");
+			String fromHTTP = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/index.html");
 			String text = "Apache Flink Dashboard";
 			assertTrue("Startpage should contain " + text, fromHTTP.contains(text));
 		} catch (Exception e) {
@@ -111,7 +130,7 @@ public class WebFrontendITCase extends TestLogger {
 	@Test
 	public void testResponseHeaders() throws Exception {
 		// check headers for successful json response
-		URL taskManagersUrl = new URL("http://localhost:" + port + "/taskmanagers");
+		URL taskManagersUrl = new URL("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers");
 		HttpURLConnection taskManagerConnection = (HttpURLConnection) taskManagersUrl.openConnection();
 		taskManagerConnection.setConnectTimeout(100000);
 		taskManagerConnection.connect();
@@ -127,14 +146,18 @@ public class WebFrontendITCase extends TestLogger {
 		Assert.assertEquals("application/json; charset=UTF-8", taskManagerConnection.getContentType());
 
 		// check headers in case of an error
-		URL notFoundJobUrl = new URL("http://localhost:" + port + "/jobs/dontexist");
+		URL notFoundJobUrl = new URL("http://localhost:" + CLUSTER.getWebUIPort() + "/jobs/dontexist");
 		HttpURLConnection notFoundJobConnection = (HttpURLConnection) notFoundJobUrl.openConnection();
 		notFoundJobConnection.setConnectTimeout(100000);
 		notFoundJobConnection.connect();
 		if (notFoundJobConnection.getResponseCode() >= 400) {
 			// we don't set the content-encoding header
 			Assert.assertNull(notFoundJobConnection.getContentEncoding());
-			Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType());
+			if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+				Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType());
+			} else {
+				Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType());
+			}
 		} else {
 			throw new RuntimeException("Request for non-existing job did not return an error.");
 		}
@@ -143,14 +166,14 @@ public class WebFrontendITCase extends TestLogger {
 	@Test
 	public void getNumberOfTaskManagers() {
 		try {
-			String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+			String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/");
 
 			ObjectMapper mapper = new ObjectMapper();
 			JsonNode response = mapper.readTree(json);
 			ArrayNode taskManagers = (ArrayNode) response.get("taskmanagers");
 
 			assertNotNull(taskManagers);
-			assertEquals(cluster.numTaskManagers(), taskManagers.size());
+			assertEquals(NUM_TASK_MANAGERS, taskManagers.size());
 		} catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
@@ -159,14 +182,14 @@ public class WebFrontendITCase extends TestLogger {
 
 	@Test
 	public void getTaskmanagers() throws Exception {
-		String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+		String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/");
 
 		ObjectMapper mapper = new ObjectMapper();
 		JsonNode parsed = mapper.readTree(json);
 		ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
 
 		assertNotNull(taskManagers);
-		assertEquals(cluster.numTaskManagers(), taskManagers.size());
+		assertEquals(NUM_TASK_MANAGERS, taskManagers.size());
 
 		JsonNode taskManager = taskManagers.get(0);
 		assertNotNull(taskManager);
@@ -176,21 +199,21 @@ public class WebFrontendITCase extends TestLogger {
 
 	@Test
 	public void getLogAndStdoutFiles() throws Exception {
-		WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());
+		WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(CLUSTER_CONFIGURATION);
 
 		FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
-		String logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/log");
+		String logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/log");
 		assertTrue(logs.contains("job manager log"));
 
 		FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
-		logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/stdout");
+		logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/stdout");
 		assertTrue(logs.contains("job manager out"));
 	}
 
 	@Test
 	public void getTaskManagerLogAndStdoutFiles() {
 		try {
-			String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+			String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/");
 
 			ObjectMapper mapper = new ObjectMapper();
 			JsonNode parsed = mapper.readTree(json);
@@ -198,15 +221,15 @@ public class WebFrontendITCase extends TestLogger {
 			JsonNode taskManager = taskManagers.get(0);
 			String id = taskManager.get("id").asText();
 
-			WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());
+			WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(CLUSTER_CONFIGURATION);
 
 			//we check for job manager log files, since no separate taskmanager logs exist
 			FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
-			String logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/log");
+			String logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/" + id + "/log");
 			assertTrue(logs.contains("job manager log"));
 
 			FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
-			logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/stdout");
+			logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/" + id + "/stdout");
 			assertTrue(logs.contains("job manager out"));
 		} catch (Exception e) {
 			e.printStackTrace();
@@ -217,12 +240,12 @@ public class WebFrontendITCase extends TestLogger {
 	@Test
 	public void getConfiguration() {
 		try {
-			String config = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/config");
+			String config = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/config");
 
 			Map<String, String> conf = WebMonitorUtils.fromKeyValueJsonArray(config);
 			assertEquals(
-				cluster.configuration().getString("taskmanager.numberOfTaskSlots", null),
-				conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS));
+				CLUSTER_CONFIGURATION.getString(ConfigConstants.LOCAL_START_WEBSERVER, null),
+				conf.get(ConfigConstants.LOCAL_START_WEBSERVER));
 		} catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
@@ -232,29 +255,42 @@ public class WebFrontendITCase extends TestLogger {
 	@Test
 	public void testStop() throws Exception {
 		// this only works if there is no active job at this point
-		assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());
+		assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());
 
 		// Create a task
 		final JobVertex sender = new JobVertex("Sender");
 		sender.setParallelism(2);
-		sender.setInvokableClass(StoppableInvokable.class);
+		sender.setInvokableClass(BlockingInvokable.class);
 
 		final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
 		final JobID jid = jobGraph.getJobID();
 
-		cluster.submitJobDetached(jobGraph);
+		ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
+		clusterClient.setDetached(true);
+		clusterClient.submitJob(jobGraph, WebFrontendITCase.class.getClassLoader());
 
 		// wait for job to show up
-		while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+		while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
 			Thread.sleep(10);
 		}
 
+		// wait for tasks to be properly running
+		BlockingInvokable.latch.await();
+
 		final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
 		final Deadline deadline = testTimeout.fromNow();
 
-		while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
-			try (HttpTestClient client = new HttpTestClient("localhost", port)) {
-				// Request the file from the web server
+		try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
+			if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+				// stop the job
+				client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft());
+				HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
+
+				assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+				assertEquals("application/json; charset=UTF-8", response.getType());
+				assertEquals("{}", response.getContent());
+			} else {
+				// stop the job
 				client.sendDeleteRequest("/jobs/" + jid + "/stop", deadline.timeLeft());
 				HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
 
@@ -262,12 +298,15 @@ public class WebFrontendITCase extends TestLogger {
 				assertEquals("application/json; charset=UTF-8", response.getType());
 				assertEquals("{}", response.getContent());
 			}
+		}
 
+		// wait for cancellation to finish
+		while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
 			Thread.sleep(20);
 		}
 
 		// ensure we can access job details when its finished (FLINK-4011)
-		try (HttpTestClient client = new HttpTestClient("localhost", port)) {
+		try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
 			FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
 			client.sendGetRequest("/jobs/" + jid + "/config", timeout);
 			HttpTestClient.SimpleHttpResponse response = client.getNextResponse(timeout);
@@ -283,40 +322,89 @@ public class WebFrontendITCase extends TestLogger {
 	@Test
 	public void testStopYarn() throws Exception {
 		// this only works if there is no active job at this point
-		assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());
+		assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());
 
 		// Create a task
 		final JobVertex sender = new JobVertex("Sender");
 		sender.setParallelism(2);
-		sender.setInvokableClass(StoppableInvokable.class);
+		sender.setInvokableClass(BlockingInvokable.class);
 
 		final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
 		final JobID jid = jobGraph.getJobID();
 
-		cluster.submitJobDetached(jobGraph);
+		ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
+		clusterClient.setDetached(true);
+		clusterClient.submitJob(jobGraph, WebFrontendITCase.class.getClassLoader());
 
 		// wait for job to show up
-		while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+		while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
 			Thread.sleep(10);
 		}
 
+		// wait for tasks to be properly running
+		BlockingInvokable.latch.await();
+
 		final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
 		final Deadline deadline = testTimeout.fromNow();
 
-		while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
-			try (HttpTestClient client = new HttpTestClient("localhost", port)) {
+		while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
+			try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
 				// Request the file from the web server
 				client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft());
 
 				HttpTestClient.SimpleHttpResponse response = client
 					.getNextResponse(deadline.timeLeft());
 
-				assertEquals(HttpResponseStatus.OK, response.getStatus());
+				if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+					assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+				} else {
+					assertEquals(HttpResponseStatus.OK, response.getStatus());
+				}
 				assertEquals("application/json; charset=UTF-8", response.getType());
 				assertEquals("{}", response.getContent());
 			}
 
 			Thread.sleep(20);
 		}
+		BlockingInvokable.reset();
+	}
+
+	private static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
+		Collection<JobStatusMessage> statusMessages = client.listJobs().get();
+		return statusMessages.stream()
+			.filter(status -> !status.getJobState().isGloballyTerminalState())
+			.map(JobStatusMessage::getJobId)
+			.collect(Collectors.toList());
+	}
+
+	/**
+	 * Test invokable that is stoppable and allows waiting for all subtasks to be running.
+	 */
+	public static class BlockingInvokable extends AbstractInvokable implements StoppableTask {
+
+		private static CountDownLatch latch = new CountDownLatch(2);
+
+		private volatile boolean isRunning = true;
+
+		public BlockingInvokable(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			latch.countDown();
+			while (isRunning) {
+				Thread.sleep(100);
+			}
+		}
+
+		@Override
+		public void stop() {
+			this.isRunning = false;
+		}
+
+		public static void reset() {
+			latch = new CountDownLatch(2);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/420190d8/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
index d9608fe..d94f7a2 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
@@ -186,6 +186,25 @@ public class HttpTestClient implements AutoCloseable {
 	}
 
 	/**
+	 * Sends a simple PATCH request to the given path. You only specify the $path part of
+	 * http://$host:$host/$path.
+	 *
+	 * @param path The $path to PATCH (http://$host:$host/$path)
+	 */
+	public void sendPatchRequest(String path, FiniteDuration timeout) throws TimeoutException, InterruptedException {
+		if (!path.startsWith("/")) {
+			path = "/" + path;
+		}
+
+		HttpRequest getRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
+			HttpMethod.PATCH, path);
+		getRequest.headers().set(HttpHeaders.Names.HOST, host);
+		getRequest.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+
+		sendRequest(getRequest, timeout);
+	}
+
+	/**
 	 * Returns the next available HTTP response. A call to this method blocks until a response
 	 * becomes available.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/420190d8/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 160c1d1..8c21b37 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -55,9 +55,9 @@ public class MiniClusterResource extends ExternalResource {
 
 	private static final Logger LOG = LoggerFactory.getLogger(MiniClusterResource.class);
 
-	private static final String CODEBASE_KEY = "codebase";
+	public static final String CODEBASE_KEY = "codebase";
 
-	private static final String NEW_CODEBASE = "new";
+	public static final String NEW_CODEBASE = "new";
 
 	private final MiniClusterResourceConfiguration miniClusterResourceConfiguration;