You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/04/19 08:00:21 UTC

[1/9] flink git commit: [FLINK-8661] Replace Collections.EMPTY_MAP with Collections.emptyMap()

Repository: flink
Updated Branches:
  refs/heads/master ae0b496c4 -> 236d28983


[FLINK-8661] Replace Collections.EMPTY_MAP with Collections.emptyMap()

This closes #5864.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/469cab48
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/469cab48
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/469cab48

Branch: refs/heads/master
Commit: 469cab48d726a7de5ddd3d2991da05d59d8499b5
Parents: 9e0c42e
Author: zhangminglei <zm...@163.com>
Authored: Wed Apr 18 09:24:57 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 09:58:14 2018 +0200

----------------------------------------------------------------------
 .../runtime/clusterframework/types/ResourceProfile.java      | 2 +-
 .../runtime/clusterframework/types/ResourceProfileTest.java  | 8 ++++----
 .../flink/runtime/rest/messages/JobAccumulatorsInfoTest.java | 2 +-
 .../src/test/java/org/apache/flink/yarn/UtilsTest.java       | 4 ++--
 4 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/469cab48/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 8fbaed1..a89b9f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -106,7 +106,7 @@ public class ResourceProfile implements Serializable, Comparable<ResourceProfile
 	 * @param heapMemoryInMB The size of the heap memory, in megabytes.
 	 */
 	public ResourceProfile(double cpuCores, int heapMemoryInMB) {
-		this(cpuCores, heapMemoryInMB, 0, 0, 0, Collections.EMPTY_MAP);
+		this(cpuCores, heapMemoryInMB, 0, 0, 0, Collections.emptyMap());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/469cab48/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
index 7ed688a..6f54d7f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/types/ResourceProfileTest.java
@@ -31,10 +31,10 @@ public class ResourceProfileTest {
 
 	@Test
 	public void testMatchRequirement() throws Exception {
-		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 0, Collections.EMPTY_MAP);
-		ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, 0, Collections.EMPTY_MAP);
-		ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, 0, Collections.EMPTY_MAP);
-		ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, 0, Collections.EMPTY_MAP);
+		ResourceProfile rp1 = new ResourceProfile(1.0, 100, 100, 100, 0, Collections.emptyMap());
+		ResourceProfile rp2 = new ResourceProfile(1.0, 200, 200, 200, 0, Collections.emptyMap());
+		ResourceProfile rp3 = new ResourceProfile(2.0, 100, 100, 100, 0, Collections.emptyMap());
+		ResourceProfile rp4 = new ResourceProfile(2.0, 200, 200, 200, 0, Collections.emptyMap());
 
 		assertFalse(rp1.isMatching(rp2));
 		assertTrue(rp2.isMatching(rp1));

http://git-wip-us.apache.org/repos/asf/flink/blob/469cab48/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
index e0e9649..856d855 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
@@ -47,6 +47,6 @@ public class JobAccumulatorsInfoTest extends RestResponseMarshallingTestBase<Job
 			"uta3.type",
 			"uta3.value"));
 
-		return new JobAccumulatorsInfo(Collections.emptyList(), userAccumulatorList, Collections.EMPTY_MAP);
+		return new JobAccumulatorsInfo(Collections.emptyList(), userAccumulatorList, Collections.emptyMap());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/469cab48/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 578e8e2..b7a38b0 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -201,7 +201,7 @@ public class UtilsTest extends TestLogger {
 
 				expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
 
-				resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST));
+				resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.emptyList()));
 
 				for (int i = 0; i < containerList.size(); i++) {
 					expectMsgClass(deadline.timeLeft(), Acknowledge.class);
@@ -217,7 +217,7 @@ public class UtilsTest extends TestLogger {
 
 				expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class);
 
-				resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST));
+				resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.emptyList()));
 
 				for (Container container: containerList) {
 					resourceManagerGateway.tell(


[6/9] flink git commit: [FLINK-8704][tests] Port ClassLoaderITCase to flip6

Posted by ch...@apache.org.
[FLINK-8704][tests] Port ClassLoaderITCase to flip6

This closes #5780.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/674a2d34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/674a2d34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/674a2d34

Branch: refs/heads/master
Commit: 674a2d3464cc5628961f3b9628713b9515ba6c6f
Parents: 420190d
Author: zentol <ch...@apache.org>
Authored: Thu Apr 5 11:49:48 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 09:58:15 2018 +0200

----------------------------------------------------------------------
 .../test/classloading/ClassLoaderITCase.java    | 159 ++++----
 .../classloading/LegacyClassLoaderITCase.java   | 399 +++++++++++++++++++
 2 files changed, 472 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/674a2d34/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index b357904..089ade4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -19,35 +19,35 @@
 package org.apache.flink.test.classloading;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.MiniClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.testdata.KMeansData;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.testutils.category.New;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.Matchers;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -56,25 +56,22 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.isA;
-import static org.hamcrest.Matchers.hasProperty;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 /**
  * Test job classloader.
  */
+@Category(New.class)
 public class ClassLoaderITCase extends TestLogger {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ClassLoaderITCase.class);
@@ -95,22 +92,21 @@ public class ClassLoaderITCase extends TestLogger {
 
 	private static final String CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH = "checkpointing_custom_kv_state-test-jar.jar";
 
-	@ClassRule
-	public static final TemporaryFolder FOLDER = new TemporaryFolder();
+
+	private static final TemporaryFolder FOLDER = new TemporaryFolder();
 
 	@Rule
 	public ExpectedException expectedException = ExpectedException.none();
 
-	private static TestingCluster testCluster;
+	private static MiniCluster testCluster;
 
-	private static int parallelism;
+	private static final int parallelism = 4;
 
 	@BeforeClass
 	public static void setUp() throws Exception {
+		FOLDER.create();
+
 		Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
-		parallelism = 4;
 
 		// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
 		config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
@@ -121,16 +117,29 @@ public class ClassLoaderITCase extends TestLogger {
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
 				FOLDER.newFolder().getAbsoluteFile().toURI().toString());
 
-		testCluster = new TestingCluster(config, false);
+		// required as we otherwise run out of memory
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80);
+
+		testCluster = new MiniCluster(
+			new MiniClusterConfiguration.Builder()
+				.setNumTaskManagers(2)
+				.setNumSlotsPerTaskManager(2)
+				.setConfiguration(config)
+			.build()
+		);
 		testCluster.start();
 	}
 
 	@AfterClass
-	public static void tearDown() throws Exception {
+	public static void tearDownClass() throws Exception {
 		if (testCluster != null) {
-			testCluster.stop();
+			testCluster.close();
 		}
+		FOLDER.delete();
+	}
 
+	@After
+	public void tearDown() throws Exception {
 		TestStreamEnvironment.unsetAsContext();
 		TestEnvironment.unsetAsContext();
 	}
@@ -202,15 +211,27 @@ public class ClassLoaderITCase extends TestLogger {
 			Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
 			Collections.<URL>emptyList());
 
-		// Program should terminate with a 'SuccessException':
-		// we can not access the SuccessException here when executing the tests with maven, because its not available in the jar.
-		expectedException.expectCause(
-			Matchers.<Throwable>hasProperty("cause",
-				hasProperty("class",
-					hasProperty("canonicalName", equalTo(
-						"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException")))));
-
-		streamingCheckpointedProg.invokeInteractiveModeForExecution();
+		try {
+			streamingCheckpointedProg.invokeInteractiveModeForExecution();
+		} catch (Exception e) {
+			// Program should terminate with a 'SuccessException':
+			// the exception class is contained in the user-jar, but is not present on the maven classpath
+			// the deserialization of the exception should thus fail here
+			try {
+				Optional<Throwable> exception = ExceptionUtils.findThrowable(e,
+					candidate -> candidate.getClass().getCanonicalName().equals("org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException"));
+
+				// if we reach this point we either failed due to another exception,
+				// or the deserialization of the user-exception did not fail
+				if (!exception.isPresent()) {
+					throw e;
+				} else {
+					Assert.fail("Deserialization of user exception should have failed.");
+				}
+			} catch (NoClassDefFoundError expected) {
+				// expected
+			}
+		}
 	}
 
 	@Test
@@ -234,15 +255,7 @@ public class ClassLoaderITCase extends TestLogger {
 
 	@Test
 	public void testUserCodeTypeJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
-		int port = testCluster.getLeaderRPCPort();
-
-		// test FLINK-3633
-		final PackagedProgram userCodeTypeProg = new PackagedProgram(
-			new File(USERCODETYPE_JAR_PATH),
-			new String[] { USERCODETYPE_JAR_PATH,
-				"localhost",
-				String.valueOf(port),
-			});
+		PackagedProgram userCodeTypeProg = new PackagedProgram(new File(USERCODETYPE_JAR_PATH));
 
 		TestEnvironment.setAsContext(
 			testCluster,
@@ -282,6 +295,8 @@ public class ClassLoaderITCase extends TestLogger {
 	 */
 	@Test
 	public void testDisposeSavepointWithCustomKvState() throws Exception {
+		ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), testCluster);
+
 		Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();
 
 		File checkpointDir = FOLDER.newFolder();
@@ -324,19 +339,18 @@ public class ClassLoaderITCase extends TestLogger {
 		// The job ID
 		JobID jobId = null;
 
-		ActorGateway jm = testCluster.getLeaderGateway(deadline.timeLeft());
-
 		LOG.info("Waiting for job status running.");
 
 		// Wait for running job
 		while (jobId == null && deadline.hasTimeLeft()) {
-			Future<Object> jobsFuture = jm.ask(JobManagerMessages.getRequestRunningJobsStatus(), deadline.timeLeft());
-			RunningJobsStatus runningJobs = (RunningJobsStatus) Await.result(jobsFuture, deadline.timeLeft());
 
-			for (JobStatusMessage runningJob : runningJobs.getStatusMessages()) {
-				jobId = runningJob.getJobId();
-				LOG.info("Job running. ID: " + jobId);
-				break;
+			Collection<JobStatusMessage> jobs = clusterClient.listJobs().get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			for (JobStatusMessage job : jobs) {
+				if (job.getJobState() == JobStatus.RUNNING) {
+					jobId = job.getJobId();
+					LOG.info("Job running. ID: " + jobId);
+					break;
+				}
 			}
 
 			// Retry if job is not available yet
@@ -345,52 +359,25 @@ public class ClassLoaderITCase extends TestLogger {
 			}
 		}
 
-		LOG.info("Wait for all tasks to be running.");
-		Future<Object> allRunning = jm.ask(new WaitForAllVerticesToBeRunning(jobId), deadline.timeLeft());
-		Await.ready(allRunning, deadline.timeLeft());
-		LOG.info("All tasks are running.");
-
 		// Trigger savepoint
 		String savepointPath = null;
 		for (int i = 0; i < 20; i++) {
 			LOG.info("Triggering savepoint (" + (i + 1) + "/20).");
-			Future<Object> savepointFuture = jm.ask(new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
-
-			Object savepointResponse = Await.result(savepointFuture, deadline.timeLeft());
-
-			if (savepointResponse.getClass() == TriggerSavepointSuccess.class) {
-				savepointPath = ((TriggerSavepointSuccess) savepointResponse).savepointPath();
-				LOG.info("Triggered savepoint. Path: " + savepointPath);
-			} else if (savepointResponse.getClass() == JobManagerMessages.TriggerSavepointFailure.class) {
-				Throwable cause = ((JobManagerMessages.TriggerSavepointFailure) savepointResponse).cause();
+			try {
+				savepointPath = clusterClient.triggerSavepoint(jobId, null)
+					.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			} catch (Exception cause) {
 				LOG.info("Failed to trigger savepoint. Retrying...", cause);
 				// This can fail if the operators are not opened yet
 				Thread.sleep(500);
-			} else {
-				throw new IllegalStateException("Unexpected response to TriggerSavepoint");
 			}
 		}
 
 		assertNotNull("Failed to trigger savepoint", savepointPath);
 
-		// Dispose savepoint
-		LOG.info("Disposing savepoint at " + savepointPath);
-		Future<Object> disposeFuture = jm.ask(new DisposeSavepoint(savepointPath), deadline.timeLeft());
-		Object disposeResponse = Await.result(disposeFuture, deadline.timeLeft());
-
-		if (disposeResponse.getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass()) {
-			// Success :-)
-			LOG.info("Disposed savepoint at " + savepointPath);
-		} else if (disposeResponse instanceof DisposeSavepointFailure) {
-			throw new IllegalStateException("Failed to dispose savepoint " + disposeResponse);
-		} else {
-			throw new IllegalStateException("Unexpected response to DisposeSavepoint");
-		}
+		clusterClient.disposeSavepoint(savepointPath).get();
 
-		// Cancel job, wait for success
-		Future<?> cancelFuture = jm.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft());
-		Object response = Await.result(cancelFuture, deadline.timeLeft());
-		assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.CancellationSuccess);
+		clusterClient.cancel(jobId);
 
 		// make sure, the execution is finished to not influence other test methods
 		invokeThread.join(deadline.timeLeft().toMillis());

http://git-wip-us.apache.org/repos/asf/flink/blob/674a2d34/flink-tests/src/test/java/org/apache/flink/test/classloading/LegacyClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/LegacyClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/LegacyClassLoaderITCase.java
new file mode 100644
index 0000000..fa114e1
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/LegacyClassLoaderITCase.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
+import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
+import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
+import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
+import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.testdata.KMeansData;
+import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.test.util.TestEnvironment;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.Matchers;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.isA;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test job classloader.
+ */
+public class LegacyClassLoaderITCase extends TestLogger {
+
+	private static final Logger LOG = LoggerFactory.getLogger(LegacyClassLoaderITCase.class);
+
+	private static final String INPUT_SPLITS_PROG_JAR_FILE = "customsplit-test-jar.jar";
+
+	private static final String STREAMING_INPUT_SPLITS_PROG_JAR_FILE = "streaming-customsplit-test-jar.jar";
+
+	private static final String STREAMING_PROG_JAR_FILE = "streamingclassloader-test-jar.jar";
+
+	private static final String STREAMING_CHECKPOINTED_PROG_JAR_FILE = "streaming-checkpointed-classloader-test-jar.jar";
+
+	private static final String KMEANS_JAR_PATH = "kmeans-test-jar.jar";
+
+	private static final String USERCODETYPE_JAR_PATH = "usercodetype-test-jar.jar";
+
+	private static final String CUSTOM_KV_STATE_JAR_PATH = "custom_kv_state-test-jar.jar";
+
+	private static final String CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH = "checkpointing_custom_kv_state-test-jar.jar";
+
+	@ClassRule
+	public static final TemporaryFolder FOLDER = new TemporaryFolder();
+
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
+	private static TestingCluster testCluster;
+
+	private static int parallelism;
+
+	@BeforeClass
+	public static void setUp() throws Exception {
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+		parallelism = 4;
+
+		// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
+		config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
+		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+				FOLDER.newFolder().getAbsoluteFile().toURI().toString());
+
+		// Savepoint path
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
+				FOLDER.newFolder().getAbsoluteFile().toURI().toString());
+
+		testCluster = new TestingCluster(config, false);
+		testCluster.start();
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (testCluster != null) {
+			testCluster.stop();
+		}
+
+		TestStreamEnvironment.unsetAsContext();
+		TestEnvironment.unsetAsContext();
+	}
+
+	@Test
+	public void testCustomSplitJobWithCustomClassLoaderJar() throws IOException, ProgramInvocationException {
+
+		PackagedProgram inputSplitTestProg = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
+
+		TestEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(INPUT_SPLITS_PROG_JAR_FILE)),
+			Collections.<URL>emptyList());
+
+		inputSplitTestProg.invokeInteractiveModeForExecution();
+	}
+
+	@Test
+	public void testStreamingCustomSplitJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+		PackagedProgram streamingInputSplitTestProg = new PackagedProgram(new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE));
+
+		TestStreamEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)),
+			Collections.<URL>emptyList());
+
+		streamingInputSplitTestProg.invokeInteractiveModeForExecution();
+	}
+
+	@Test
+	public void testCustomSplitJobWithCustomClassLoaderPath() throws IOException, ProgramInvocationException {
+		URL classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL();
+		PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
+
+		TestEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.<Path>emptyList(),
+			Collections.singleton(classpath));
+
+		inputSplitTestProg2.invokeInteractiveModeForExecution();
+	}
+
+	@Test
+	public void testStreamingClassloaderJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+		// regular streaming job
+		PackagedProgram streamingProg = new PackagedProgram(new File(STREAMING_PROG_JAR_FILE));
+
+		TestStreamEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(STREAMING_PROG_JAR_FILE)),
+			Collections.<URL>emptyList());
+
+		streamingProg.invokeInteractiveModeForExecution();
+	}
+
+	@Test
+	public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+		// checkpointed streaming job with custom classes for the checkpoint (FLINK-2543)
+		// the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient.
+		PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE));
+
+		TestStreamEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
+			Collections.<URL>emptyList());
+
+		// Program should terminate with a 'SuccessException':
+		// we can not access the SuccessException here when executing the tests with maven, because its not available in the jar.
+		expectedException.expectCause(
+			Matchers.<Throwable>hasProperty("cause",
+				hasProperty("class",
+					hasProperty("canonicalName", equalTo(
+						"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException")))));
+
+		streamingCheckpointedProg.invokeInteractiveModeForExecution();
+	}
+
+	@Test
+	public void testKMeansJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+		PackagedProgram kMeansProg = new PackagedProgram(
+			new File(KMEANS_JAR_PATH),
+			new String[] {
+				KMeansData.DATAPOINTS,
+				KMeansData.INITIAL_CENTERS,
+				"25"
+			});
+
+		TestEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(KMEANS_JAR_PATH)),
+			Collections.<URL>emptyList());
+
+		kMeansProg.invokeInteractiveModeForExecution();
+	}
+
+	@Test
+	public void testUserCodeTypeJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+		int port = testCluster.getLeaderRPCPort();
+
+		// test FLINK-3633
+		final PackagedProgram userCodeTypeProg = new PackagedProgram(
+			new File(USERCODETYPE_JAR_PATH),
+			new String[] { USERCODETYPE_JAR_PATH,
+				"localhost",
+				String.valueOf(port),
+			});
+
+		TestEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(USERCODETYPE_JAR_PATH)),
+			Collections.<URL>emptyList());
+
+		userCodeTypeProg.invokeInteractiveModeForExecution();
+	}
+
+	@Test
+	public void testCheckpointingCustomKvStateJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+		File checkpointDir = FOLDER.newFolder();
+		File outputDir = FOLDER.newFolder();
+
+		final PackagedProgram program = new PackagedProgram(
+			new File(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH),
+			new String[] {
+				checkpointDir.toURI().toString(),
+				outputDir.toURI().toString()
+			});
+
+		TestStreamEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH)),
+			Collections.<URL>emptyList());
+
+		expectedException.expectCause(
+			Matchers.<Throwable>hasProperty("cause", isA(SuccessException.class)));
+
+		program.invokeInteractiveModeForExecution();
+	}
+
+	/**
+	 * Tests disposal of a savepoint, which contains custom user code KvState.
+	 */
+	@Test
+	public void testDisposeSavepointWithCustomKvState() throws Exception {
+		Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow();
+
+		File checkpointDir = FOLDER.newFolder();
+		File outputDir = FOLDER.newFolder();
+
+		final PackagedProgram program = new PackagedProgram(
+				new File(CUSTOM_KV_STATE_JAR_PATH),
+				new String[] {
+						String.valueOf(parallelism),
+						checkpointDir.toURI().toString(),
+						"5000",
+						outputDir.toURI().toString()
+				});
+
+		TestStreamEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(CUSTOM_KV_STATE_JAR_PATH)),
+			Collections.<URL>emptyList()
+		);
+
+		// Execute detached
+		Thread invokeThread = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					program.invokeInteractiveModeForExecution();
+				} catch (ProgramInvocationException ignored) {
+					if (ignored.getCause() == null ||
+						!(ignored.getCause() instanceof JobCancellationException)) {
+						ignored.printStackTrace();
+					}
+				}
+			}
+		});
+
+		LOG.info("Starting program invoke thread");
+		invokeThread.start();
+
+		// The job ID
+		JobID jobId = null;
+
+		ActorGateway jm = testCluster.getLeaderGateway(deadline.timeLeft());
+
+		LOG.info("Waiting for job status running.");
+
+		// Wait for running job
+		while (jobId == null && deadline.hasTimeLeft()) {
+			Future<Object> jobsFuture = jm.ask(JobManagerMessages.getRequestRunningJobsStatus(), deadline.timeLeft());
+			RunningJobsStatus runningJobs = (RunningJobsStatus) Await.result(jobsFuture, deadline.timeLeft());
+
+			for (JobStatusMessage runningJob : runningJobs.getStatusMessages()) {
+				jobId = runningJob.getJobId();
+				LOG.info("Job running. ID: " + jobId);
+				break;
+			}
+
+			// Retry if job is not available yet
+			if (jobId == null) {
+				Thread.sleep(100L);
+			}
+		}
+
+		LOG.info("Wait for all tasks to be running.");
+		Future<Object> allRunning = jm.ask(new WaitForAllVerticesToBeRunning(jobId), deadline.timeLeft());
+		Await.ready(allRunning, deadline.timeLeft());
+		LOG.info("All tasks are running.");
+
+		// Trigger savepoint
+		String savepointPath = null;
+		for (int i = 0; i < 20; i++) {
+			LOG.info("Triggering savepoint (" + (i + 1) + "/20).");
+			Future<Object> savepointFuture = jm.ask(new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
+
+			Object savepointResponse = Await.result(savepointFuture, deadline.timeLeft());
+
+			if (savepointResponse.getClass() == TriggerSavepointSuccess.class) {
+				savepointPath = ((TriggerSavepointSuccess) savepointResponse).savepointPath();
+				LOG.info("Triggered savepoint. Path: " + savepointPath);
+			} else if (savepointResponse.getClass() == JobManagerMessages.TriggerSavepointFailure.class) {
+				Throwable cause = ((JobManagerMessages.TriggerSavepointFailure) savepointResponse).cause();
+				LOG.info("Failed to trigger savepoint. Retrying...", cause);
+				// This can fail if the operators are not opened yet
+				Thread.sleep(500);
+			} else {
+				throw new IllegalStateException("Unexpected response to TriggerSavepoint");
+			}
+		}
+
+		assertNotNull("Failed to trigger savepoint", savepointPath);
+
+		// Dispose savepoint
+		LOG.info("Disposing savepoint at " + savepointPath);
+		Future<Object> disposeFuture = jm.ask(new DisposeSavepoint(savepointPath), deadline.timeLeft());
+		Object disposeResponse = Await.result(disposeFuture, deadline.timeLeft());
+
+		if (disposeResponse.getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass()) {
+			// Success :-)
+			LOG.info("Disposed savepoint at " + savepointPath);
+		} else if (disposeResponse instanceof DisposeSavepointFailure) {
+			throw new IllegalStateException("Failed to dispose savepoint " + disposeResponse);
+		} else {
+			throw new IllegalStateException("Unexpected response to DisposeSavepoint");
+		}
+
+		// Cancel job, wait for success
+		Future<?> cancelFuture = jm.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft());
+		Object response = Await.result(cancelFuture, deadline.timeLeft());
+		assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.CancellationSuccess);
+
+		// make sure, the execution is finished to not influence other test methods
+		invokeThread.join(deadline.timeLeft().toMillis());
+		assertFalse("Program invoke thread still running", invokeThread.isAlive());
+	}
+}


[8/9] flink git commit: [FLINK-8703][tests] Port WebFrontendITCase to MiniClusterResource

Posted by ch...@apache.org.
[FLINK-8703][tests] Port WebFrontendITCase to MiniClusterResource

This closes #5665.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/420190d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/420190d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/420190d8

Branch: refs/heads/master
Commit: 420190d85cdab414fa24db9c161852fcbcb81451
Parents: 31f4036
Author: zentol <ch...@apache.org>
Authored: Wed Mar 7 11:14:46 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 09:58:15 2018 +0200

----------------------------------------------------------------------
 .../runtime/webmonitor/WebFrontendITCase.java   | 202 +++++++++++++------
 .../webmonitor/testutils/HttpTestClient.java    |  19 ++
 .../flink/test/util/MiniClusterResource.java    |   4 +-
 3 files changed, 166 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/420190d8/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index 14602e3..f512766 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -19,15 +19,19 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.testutils.StoppableInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
 import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -38,8 +42,9 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.io.File;
@@ -47,8 +52,13 @@ import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.file.Files;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
@@ -66,40 +76,49 @@ public class WebFrontendITCase extends TestLogger {
 	private static final int NUM_TASK_MANAGERS = 2;
 	private static final int NUM_SLOTS = 4;
 
-	private static LocalFlinkMiniCluster cluster;
+	private static final Configuration CLUSTER_CONFIGURATION = getClusterConfiguration();
 
-	private static int port = -1;
+	@ClassRule
+	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
+		new MiniClusterResource.MiniClusterResourceConfiguration(
+			CLUSTER_CONFIGURATION,
+			NUM_TASK_MANAGERS,
+			NUM_SLOTS),
+		true
+	);
 
-	@BeforeClass
-	public static void initialize() throws Exception {
+	private static Configuration getClusterConfiguration() {
 		Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS);
-		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
-		config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
-
-		File logDir = File.createTempFile("TestBaseUtils-logdir", null);
-		assertTrue("Unable to delete temp file", logDir.delete());
-		assertTrue("Unable to create temp directory", logDir.mkdir());
-		File logFile = new File(logDir, "jobmanager.log");
-		File outFile = new File(logDir, "jobmanager.out");
+		try {
+			File logDir = File.createTempFile("TestBaseUtils-logdir", null);
+			assertTrue("Unable to delete temp file", logDir.delete());
+			assertTrue("Unable to create temp directory", logDir.mkdir());
+			File logFile = new File(logDir, "jobmanager.log");
+			File outFile = new File(logDir, "jobmanager.out");
 
-		Files.createFile(logFile.toPath());
-		Files.createFile(outFile.toPath());
+			Files.createFile(logFile.toPath());
+			Files.createFile(outFile.toPath());
 
-		config.setString(WebOptions.LOG_PATH, logFile.getAbsolutePath());
-		config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
+			config.setString(WebOptions.LOG_PATH, logFile.getAbsolutePath());
+			config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.getAbsolutePath());
+		} catch (Exception e) {
+			throw new AssertionError("Could not setup test.", e);
+		}
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
+		config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
-		cluster = new LocalFlinkMiniCluster(config, false);
-		cluster.start();
+		return config;
+	}
 
-		port = cluster.webMonitor().get().getServerPort();
+	@After
+	public void tearDown() {
+		BlockingInvokable.reset();
 	}
 
 	@Test
 	public void getFrontPage() {
 		try {
-			String fromHTTP = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/index.html");
+			String fromHTTP = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/index.html");
 			String text = "Apache Flink Dashboard";
 			assertTrue("Startpage should contain " + text, fromHTTP.contains(text));
 		} catch (Exception e) {
@@ -111,7 +130,7 @@ public class WebFrontendITCase extends TestLogger {
 	@Test
 	public void testResponseHeaders() throws Exception {
 		// check headers for successful json response
-		URL taskManagersUrl = new URL("http://localhost:" + port + "/taskmanagers");
+		URL taskManagersUrl = new URL("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers");
 		HttpURLConnection taskManagerConnection = (HttpURLConnection) taskManagersUrl.openConnection();
 		taskManagerConnection.setConnectTimeout(100000);
 		taskManagerConnection.connect();
@@ -127,14 +146,18 @@ public class WebFrontendITCase extends TestLogger {
 		Assert.assertEquals("application/json; charset=UTF-8", taskManagerConnection.getContentType());
 
 		// check headers in case of an error
-		URL notFoundJobUrl = new URL("http://localhost:" + port + "/jobs/dontexist");
+		URL notFoundJobUrl = new URL("http://localhost:" + CLUSTER.getWebUIPort() + "/jobs/dontexist");
 		HttpURLConnection notFoundJobConnection = (HttpURLConnection) notFoundJobUrl.openConnection();
 		notFoundJobConnection.setConnectTimeout(100000);
 		notFoundJobConnection.connect();
 		if (notFoundJobConnection.getResponseCode() >= 400) {
 			// we don't set the content-encoding header
 			Assert.assertNull(notFoundJobConnection.getContentEncoding());
-			Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType());
+			if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+				Assert.assertEquals("application/json; charset=UTF-8", notFoundJobConnection.getContentType());
+			} else {
+				Assert.assertEquals("text/plain; charset=UTF-8", notFoundJobConnection.getContentType());
+			}
 		} else {
 			throw new RuntimeException("Request for non-existing job did not return an error.");
 		}
@@ -143,14 +166,14 @@ public class WebFrontendITCase extends TestLogger {
 	@Test
 	public void getNumberOfTaskManagers() {
 		try {
-			String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+			String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/");
 
 			ObjectMapper mapper = new ObjectMapper();
 			JsonNode response = mapper.readTree(json);
 			ArrayNode taskManagers = (ArrayNode) response.get("taskmanagers");
 
 			assertNotNull(taskManagers);
-			assertEquals(cluster.numTaskManagers(), taskManagers.size());
+			assertEquals(NUM_TASK_MANAGERS, taskManagers.size());
 		} catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
@@ -159,14 +182,14 @@ public class WebFrontendITCase extends TestLogger {
 
 	@Test
 	public void getTaskmanagers() throws Exception {
-		String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+		String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/");
 
 		ObjectMapper mapper = new ObjectMapper();
 		JsonNode parsed = mapper.readTree(json);
 		ArrayNode taskManagers = (ArrayNode) parsed.get("taskmanagers");
 
 		assertNotNull(taskManagers);
-		assertEquals(cluster.numTaskManagers(), taskManagers.size());
+		assertEquals(NUM_TASK_MANAGERS, taskManagers.size());
 
 		JsonNode taskManager = taskManagers.get(0);
 		assertNotNull(taskManager);
@@ -176,21 +199,21 @@ public class WebFrontendITCase extends TestLogger {
 
 	@Test
 	public void getLogAndStdoutFiles() throws Exception {
-		WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());
+		WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(CLUSTER_CONFIGURATION);
 
 		FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
-		String logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/log");
+		String logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/log");
 		assertTrue(logs.contains("job manager log"));
 
 		FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
-		logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/stdout");
+		logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/stdout");
 		assertTrue(logs.contains("job manager out"));
 	}
 
 	@Test
 	public void getTaskManagerLogAndStdoutFiles() {
 		try {
-			String json = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/");
+			String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/");
 
 			ObjectMapper mapper = new ObjectMapper();
 			JsonNode parsed = mapper.readTree(json);
@@ -198,15 +221,15 @@ public class WebFrontendITCase extends TestLogger {
 			JsonNode taskManager = taskManagers.get(0);
 			String id = taskManager.get("id").asText();
 
-			WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(cluster.configuration());
+			WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(CLUSTER_CONFIGURATION);
 
 			//we check for job manager log files, since no separate taskmanager logs exist
 			FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
-			String logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/log");
+			String logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/" + id + "/log");
 			assertTrue(logs.contains("job manager log"));
 
 			FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
-			logs = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/taskmanagers/" + id + "/stdout");
+			logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/" + id + "/stdout");
 			assertTrue(logs.contains("job manager out"));
 		} catch (Exception e) {
 			e.printStackTrace();
@@ -217,12 +240,12 @@ public class WebFrontendITCase extends TestLogger {
 	@Test
 	public void getConfiguration() {
 		try {
-			String config = TestBaseUtils.getFromHTTP("http://localhost:" + port + "/jobmanager/config");
+			String config = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/config");
 
 			Map<String, String> conf = WebMonitorUtils.fromKeyValueJsonArray(config);
 			assertEquals(
-				cluster.configuration().getString("taskmanager.numberOfTaskSlots", null),
-				conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS));
+				CLUSTER_CONFIGURATION.getString(ConfigConstants.LOCAL_START_WEBSERVER, null),
+				conf.get(ConfigConstants.LOCAL_START_WEBSERVER));
 		} catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
@@ -232,29 +255,42 @@ public class WebFrontendITCase extends TestLogger {
 	@Test
 	public void testStop() throws Exception {
 		// this only works if there is no active job at this point
-		assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());
+		assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());
 
 		// Create a task
 		final JobVertex sender = new JobVertex("Sender");
 		sender.setParallelism(2);
-		sender.setInvokableClass(StoppableInvokable.class);
+		sender.setInvokableClass(BlockingInvokable.class);
 
 		final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
 		final JobID jid = jobGraph.getJobID();
 
-		cluster.submitJobDetached(jobGraph);
+		ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
+		clusterClient.setDetached(true);
+		clusterClient.submitJob(jobGraph, WebFrontendITCase.class.getClassLoader());
 
 		// wait for job to show up
-		while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+		while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
 			Thread.sleep(10);
 		}
 
+		// wait for tasks to be properly running
+		BlockingInvokable.latch.await();
+
 		final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
 		final Deadline deadline = testTimeout.fromNow();
 
-		while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
-			try (HttpTestClient client = new HttpTestClient("localhost", port)) {
-				// Request the file from the web server
+		try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
+			if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+				// stop the job
+				client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft());
+				HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
+
+				assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+				assertEquals("application/json; charset=UTF-8", response.getType());
+				assertEquals("{}", response.getContent());
+			} else {
+				// stop the job
 				client.sendDeleteRequest("/jobs/" + jid + "/stop", deadline.timeLeft());
 				HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
 
@@ -262,12 +298,15 @@ public class WebFrontendITCase extends TestLogger {
 				assertEquals("application/json; charset=UTF-8", response.getType());
 				assertEquals("{}", response.getContent());
 			}
+		}
 
+		// wait for cancellation to finish
+		while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
 			Thread.sleep(20);
 		}
 
 		// ensure we can access job details when its finished (FLINK-4011)
-		try (HttpTestClient client = new HttpTestClient("localhost", port)) {
+		try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
 			FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
 			client.sendGetRequest("/jobs/" + jid + "/config", timeout);
 			HttpTestClient.SimpleHttpResponse response = client.getNextResponse(timeout);
@@ -283,40 +322,89 @@ public class WebFrontendITCase extends TestLogger {
 	@Test
 	public void testStopYarn() throws Exception {
 		// this only works if there is no active job at this point
-		assertTrue(cluster.getCurrentlyRunningJobsJava().isEmpty());
+		assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());
 
 		// Create a task
 		final JobVertex sender = new JobVertex("Sender");
 		sender.setParallelism(2);
-		sender.setInvokableClass(StoppableInvokable.class);
+		sender.setInvokableClass(BlockingInvokable.class);
 
 		final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
 		final JobID jid = jobGraph.getJobID();
 
-		cluster.submitJobDetached(jobGraph);
+		ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
+		clusterClient.setDetached(true);
+		clusterClient.submitJob(jobGraph, WebFrontendITCase.class.getClassLoader());
 
 		// wait for job to show up
-		while (cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+		while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
 			Thread.sleep(10);
 		}
 
+		// wait for tasks to be properly running
+		BlockingInvokable.latch.await();
+
 		final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES);
 		final Deadline deadline = testTimeout.fromNow();
 
-		while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
-			try (HttpTestClient client = new HttpTestClient("localhost", port)) {
+		while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
+			try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) {
 				// Request the file from the web server
 				client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft());
 
 				HttpTestClient.SimpleHttpResponse response = client
 					.getNextResponse(deadline.timeLeft());
 
-				assertEquals(HttpResponseStatus.OK, response.getStatus());
+				if (Objects.equals(MiniClusterResource.NEW_CODEBASE, System.getProperty(MiniClusterResource.CODEBASE_KEY))) {
+					assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
+				} else {
+					assertEquals(HttpResponseStatus.OK, response.getStatus());
+				}
 				assertEquals("application/json; charset=UTF-8", response.getType());
 				assertEquals("{}", response.getContent());
 			}
 
 			Thread.sleep(20);
 		}
+		BlockingInvokable.reset();
+	}
+
+	private static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
+		Collection<JobStatusMessage> statusMessages = client.listJobs().get();
+		return statusMessages.stream()
+			.filter(status -> !status.getJobState().isGloballyTerminalState())
+			.map(JobStatusMessage::getJobId)
+			.collect(Collectors.toList());
+	}
+
+	/**
+	 * Test invokable that is stoppable and allows waiting for all subtasks to be running.
+	 */
+	public static class BlockingInvokable extends AbstractInvokable implements StoppableTask {
+
+		private static CountDownLatch latch = new CountDownLatch(2);
+
+		private volatile boolean isRunning = true;
+
+		public BlockingInvokable(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			latch.countDown();
+			while (isRunning) {
+				Thread.sleep(100);
+			}
+		}
+
+		@Override
+		public void stop() {
+			this.isRunning = false;
+		}
+
+		public static void reset() {
+			latch = new CountDownLatch(2);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/420190d8/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
index d9608fe..d94f7a2 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/testutils/HttpTestClient.java
@@ -186,6 +186,25 @@ public class HttpTestClient implements AutoCloseable {
 	}
 
 	/**
+	 * Sends a simple PATCH request to the given path. You only specify the $path part of
+	 * http://$host:$host/$path.
+	 *
+	 * @param path The $path to PATCH (http://$host:$host/$path)
+	 */
+	public void sendPatchRequest(String path, FiniteDuration timeout) throws TimeoutException, InterruptedException {
+		if (!path.startsWith("/")) {
+			path = "/" + path;
+		}
+
+		HttpRequest getRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
+			HttpMethod.PATCH, path);
+		getRequest.headers().set(HttpHeaders.Names.HOST, host);
+		getRequest.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+
+		sendRequest(getRequest, timeout);
+	}
+
+	/**
 	 * Returns the next available HTTP response. A call to this method blocks until a response
 	 * becomes available.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/420190d8/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 160c1d1..8c21b37 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -55,9 +55,9 @@ public class MiniClusterResource extends ExternalResource {
 
 	private static final Logger LOG = LoggerFactory.getLogger(MiniClusterResource.class);
 
-	private static final String CODEBASE_KEY = "codebase";
+	public static final String CODEBASE_KEY = "codebase";
 
-	private static final String NEW_CODEBASE = "new";
+	public static final String NEW_CODEBASE = "new";
 
 	private final MiniClusterResourceConfiguration miniClusterResourceConfiguration;
 


[3/9] flink git commit: [FLINK-8960][tests] Port SavepointITCase to flip6

Posted by ch...@apache.org.
[FLINK-8960][tests] Port SavepointITCase to flip6

This closes #5806.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0ee05eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0ee05eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0ee05eb

Branch: refs/heads/master
Commit: b0ee05eb79433262f21d7945f090d65629496045
Parents: 674a2d3
Author: zentol <ch...@apache.org>
Authored: Tue Mar 27 14:45:03 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 09:58:15 2018 +0200

----------------------------------------------------------------------
 .../test/checkpointing/SavepointITCase.java     | 510 +++++++------------
 1 file changed, 170 insertions(+), 340 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b0ee05eb/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 888c418..9549dc7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -26,41 +26,20 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.checkpoint.OperatorState;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestSavepoint;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseSavepoint;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseSubmitTaskListener;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -69,15 +48,11 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.HashMultimap;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Multimap;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -87,27 +62,18 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.util.ArrayList;
+import java.net.URI;
+import java.time.Duration;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -142,264 +108,108 @@ public class SavepointITCase extends TestLogger {
 		final int numTaskManagers = 2;
 		final int numSlotsPerTaskManager = 2;
 		final int parallelism = numTaskManagers * numSlotsPerTaskManager;
-		final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
-		final File testRoot = folder.getRoot();
-
-		TestingCluster flink = null;
-
-		try {
-			// Create a test actor system
-			ActorSystem testActorSystem = AkkaUtils.createDefaultActorSystem();
-
-			// Flink configuration
-			final Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
-
-			final File checkpointDir = new File(testRoot, "checkpoints");
-			final File savepointRootDir = new File(testRoot, "savepoints");
+		final File testRoot = folder.newFolder();
 
-			if (!checkpointDir.mkdir() || !savepointRootDir.mkdirs()) {
-				fail("Test setup failed: failed to create temporary directories.");
-			}
-
-			// Use file based checkpoints
-			config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
-			config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
-			config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
-			config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointRootDir.toURI().toString());
-
-			// Start Flink
-			flink = new TestingCluster(config);
-			flink.start(true);
-
-			// Submit the job
-			final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
-			final JobID jobId = jobGraph.getJobID();
-
-			// Reset the static test job helpers
-			StatefulCounter.resetForTest(parallelism);
+		Configuration config = new Configuration();
 
-			// Retrieve the job manager
-			ActorGateway jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft());
+		final File checkpointDir = new File(testRoot, "checkpoints");
+		final File savepointRootDir = new File(testRoot, "savepoints");
 
-			LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
+		if (!checkpointDir.mkdir() || !savepointRootDir.mkdirs()) {
+			fail("Test setup failed: failed to create temporary directories.");
+		}
 
-			flink.submitJobDetached(jobGraph);
+		// Use file based checkpoints
+		config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
+		config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
+		config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointRootDir.toURI().toString());
 
-			LOG.info("Waiting for some progress.");
+		MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(numTaskManagers, numSlotsPerTaskManager, config);
 
-			// wait for the JobManager to be ready
-			Future<Object> allRunning = jobManager.ask(new WaitForAllVerticesToBeRunning(jobId), deadline.timeLeft());
-			Await.ready(allRunning, deadline.timeLeft());
+		String savepointPath = submitJobAndGetVerifiedSavepoint(clusterFactory, parallelism);
 
-			// wait for the Tasks to be ready
-			StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+		restoreJobAndVerifyState(savepointPath, clusterFactory, parallelism);
+	}
 
-			LOG.info("Triggering a savepoint.");
-			Future<Object> savepointPathFuture = jobManager.ask(new TriggerSavepoint(jobId, Option.<String>empty()), deadline.timeLeft());
-			final String savepointPath = ((TriggerSavepointSuccess) Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
-			LOG.info("Retrieved savepoint path: " + savepointPath + ".");
+	private String submitJobAndGetVerifiedSavepoint(MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception {
+		final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
+		final JobID jobId = jobGraph.getJobID();
+		StatefulCounter.resetForTest(parallelism);
 
-			// Retrieve the savepoint from the testing job manager
-			LOG.info("Requesting the savepoint.");
-			Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft());
+		MiniClusterResource cluster = clusterFactory.get();
+		cluster.before();
+		ClusterClient<?> client = cluster.getClusterClient();
 
-			SavepointV2 savepoint = (SavepointV2) ((ResponseSavepoint) Await.result(savepointFuture, deadline.timeLeft())).savepoint();
-			LOG.info("Retrieved savepoint: " + savepointPath + ".");
+		try {
+			client.setDetached(true);
+			client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
 
-			// Shut down the Flink cluster (thereby canceling the job)
-			LOG.info("Shutting down Flink cluster.");
-			flink.stop();
-			flink = null;
+			StatefulCounter.getProgressLatch().await();
 
-			// - Verification START -------------------------------------------
+			String savepointPath = client.triggerSavepoint(jobId, null).get();
 
 			// Only one savepoint should exist
-			File[] files = savepointRootDir.listFiles();
-
-			if (files != null) {
-				assertEquals("Savepoint not created in expected directory", 1, files.length);
-				assertTrue("Savepoint did not create self-contained directory", files[0].isDirectory());
+			File savepointDir = new File(new URI(savepointPath));
+			assertTrue("Savepoint directory does not exist.", savepointDir.exists());
+			assertTrue("Savepoint did not create self-contained directory.", savepointDir.isDirectory());
 
-				File savepointDir = files[0];
-				File[] savepointFiles = savepointDir.listFiles();
-				assertNotNull(savepointFiles);
+			File[] savepointFiles = savepointDir.listFiles();
 
+			if (savepointFiles != null) {
 				// Expect one metadata file and one checkpoint file per stateful
 				// parallel subtask
 				String errMsg = "Did not write expected number of savepoint/checkpoint files to directory: "
 					+ Arrays.toString(savepointFiles);
 				assertEquals(errMsg, 1 + parallelism, savepointFiles.length);
 			} else {
-				fail("Savepoint not created in expected directory");
+				fail(String.format("Returned savepoint path (%s) is not valid.", savepointPath));
 			}
 
-			// - Verification END ---------------------------------------------
-
-			// Restart the cluster
-			LOG.info("Restarting Flink cluster.");
-			flink = new TestingCluster(config);
-			flink.start();
-
-			// Retrieve the job manager
-			LOG.info("Retrieving JobManager.");
-			jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft());
-			LOG.info("JobManager: " + jobManager + ".");
-
-			// Reset static test helpers
+			return savepointPath;
+		} finally {
+			cluster.after();
 			StatefulCounter.resetForTest(parallelism);
+		}
+	}
 
-			// Gather all task deployment descriptors
-			final Throwable[] error = new Throwable[1];
-			final TestingCluster finalFlink = flink;
-			final Multimap<JobVertexID, TaskDeploymentDescriptor> tdds = HashMultimap.create();
-
-			new JavaTestKit(testActorSystem) {{
-
-				new Within(deadline.timeLeft()) {
-					@Override
-					protected void run() {
-						try {
-							// Register to all submit task messages for job
-							for (ActorRef taskManager : finalFlink.getTaskManagersAsJava()) {
-								taskManager.tell(new TestingTaskManagerMessages
-									.RegisterSubmitTaskListener(jobId), getTestActor());
-							}
-
-							// Set the savepoint path
-							jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
-
-							LOG.info("Resubmitting job " + jobGraph.getJobID() + " with " +
-								"savepoint path " + savepointPath + " in detached mode.");
-
-							// Submit the job
-							finalFlink.submitJobDetached(jobGraph);
-
-							int numTasks = 0;
-							for (JobVertex jobVertex : jobGraph.getVertices()) {
-								numTasks += jobVertex.getParallelism();
-							}
-
-							// Gather the task deployment descriptors
-							LOG.info("Gathering " + numTasks + " submitted " +
-								"TaskDeploymentDescriptor instances.");
-
-							for (int i = 0; i < numTasks; i++) {
-								ResponseSubmitTaskListener resp = (ResponseSubmitTaskListener)
-									expectMsgAnyClassOf(getRemainingTime(),
-										ResponseSubmitTaskListener.class);
-
-								TaskDeploymentDescriptor tdd = resp.tdd();
-
-								LOG.info("Received: " + tdd.toString() + ".");
-
-								TaskInformation taskInformation = tdd
-									.getSerializedTaskInformation()
-									.deserializeValue(getClass().getClassLoader());
-
-								tdds.put(taskInformation.getJobVertexId(), tdd);
-							}
-						} catch (Throwable t) {
-							error[0] = t;
-						}
-					}
-				};
-			}};
-
-			ExecutionGraph graph = (ExecutionGraph) ((JobManagerMessages.JobFound) Await.result(jobManager.ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft()), deadline.timeLeft())).executionGraph();
-
-			// - Verification START -------------------------------------------
-
-			String errMsg = "Error during gathering of TaskDeploymentDescriptors";
-			if (error[0] != null) {
-				throw new RuntimeException(error[0]);
-			}
-
-			Map<OperatorID, Tuple2<Integer, ExecutionJobVertex>> operatorToJobVertexMapping = new HashMap<>();
-			for (ExecutionJobVertex task : graph.getVerticesTopologically()) {
-				List<OperatorID> operatorIDs = task.getOperatorIDs();
-				for (int x = 0; x < operatorIDs.size(); x++) {
-					operatorToJobVertexMapping.put(operatorIDs.get(x), new Tuple2<>(x, task));
-				}
-			}
-
-			// Verify that all tasks, which are part of the savepoint
-			// have a matching task deployment descriptor.
-			for (OperatorState operatorState : savepoint.getOperatorStates()) {
-				Tuple2<Integer, ExecutionJobVertex> chainIndexAndJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
-				Collection<TaskDeploymentDescriptor> taskTdds = tdds.get(chainIndexAndJobVertex.f1.getJobVertexId());
-
-				errMsg = "Missing task for savepoint state for operator "
-					+ operatorState.getOperatorID() + ".";
-				assertTrue(errMsg, taskTdds.size() > 0);
-
-				assertEquals(operatorState.getNumberCollectedStates(), taskTdds.size());
+	private void restoreJobAndVerifyState(String savepointPath, MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception {
+		final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000);
+		jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
+		final JobID jobId = jobGraph.getJobID();
+		StatefulCounter.resetForTest(parallelism);
 
-				for (TaskDeploymentDescriptor tdd : taskTdds) {
-					OperatorSubtaskState subtaskState = operatorState.getState(tdd.getSubtaskIndex());
+		MiniClusterResource cluster = clusterFactory.get();
+		cluster.before();
+		ClusterClient<?> client = cluster.getClusterClient();
 
-					assertNotNull(subtaskState);
-				}
-			}
+		try {
+			client.setDetached(true);
+			client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
 
 			// Await state is restored
-			StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			StatefulCounter.getRestoreLatch().await();
 
 			// Await some progress after restore
-			StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
-
-			// - Verification END ---------------------------------------------
+			StatefulCounter.getProgressLatch().await();
 
-			LOG.info("Cancelling job " + jobId + ".");
-			jobManager.tell(new CancelJob(jobId));
+			client.cancel(jobId);
 
-			LOG.info("Disposing savepoint " + savepointPath + ".");
-			Future<Object> disposeFuture = jobManager.ask(new DisposeSavepoint(savepointPath), deadline.timeLeft());
+			FutureUtils.retrySuccesfulWithDelay(
+				() -> client.getJobStatus(jobId),
+				Time.milliseconds(50),
+				Deadline.now().plus(Duration.ofSeconds(30)),
+				status -> status == JobStatus.CANCELED,
+				TestingUtils.defaultScheduledExecutor()
+			);
 
-			errMsg = "Failed to dispose savepoint " + savepointPath + ".";
-			Object resp = Await.result(disposeFuture, deadline.timeLeft());
-			assertTrue(errMsg, resp.getClass() == getDisposeSavepointSuccess().getClass());
+			client.disposeSavepoint(savepointPath)
+				.get();
 
-			// - Verification START -------------------------------------------
-			// The checkpoint files
-			List<File> checkpointFiles = new ArrayList<>();
-
-			for (OperatorState stateForTaskGroup : savepoint.getOperatorStates()) {
-				for (OperatorSubtaskState subtaskState : stateForTaskGroup.getStates()) {
-					Collection<OperatorStateHandle> streamTaskState = subtaskState.getManagedOperatorState();
-
-					if (streamTaskState != null && !streamTaskState.isEmpty()) {
-						for (OperatorStateHandle osh : streamTaskState) {
-							FileStateHandle fileStateHandle = (FileStateHandle) osh.getDelegateStateHandle();
-							checkpointFiles.add(new File(fileStateHandle.getFilePath().toUri()));
-						}
-					}
-				}
-			}
-
-			// The checkpoint files of the savepoint should have been discarded
-			for (File f : checkpointFiles) {
-				errMsg = "Checkpoint file " + f + " not cleaned up properly.";
-				assertFalse(errMsg, f.exists());
-			}
-
-			if (checkpointFiles.size() > 0) {
-				File parent = checkpointFiles.get(0).getParentFile();
-				errMsg = "Checkpoint parent directory " + parent + " not cleaned up properly.";
-				assertFalse(errMsg, parent.exists());
-			}
-
-			// All savepoints should have been cleaned up
-			errMsg = "Savepoints directory not cleaned up properly: " +
-				Arrays.toString(savepointRootDir.listFiles()) + ".";
-			assertEquals(errMsg, 0, savepointRootDir.listFiles().length);
-
-			// - Verification END ---------------------------------------------
+			assertFalse("Savepoint not properly cleaned up.", new File(savepointPath).exists());
 		} finally {
-			if (flink != null) {
-				flink.stop();
-			}
+			cluster.after();
+			StatefulCounter.resetForTest(parallelism);
 		}
 	}
 
@@ -410,34 +220,23 @@ public class SavepointITCase extends TestLogger {
 		int numSlotsPerTaskManager = 1;
 		int parallelism = numTaskManagers * numSlotsPerTaskManager;
 
-		// Test deadline
-		final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
-
-		final File tmpDir = folder.getRoot();
+		final File tmpDir = folder.newFolder();
 		final File savepointDir = new File(tmpDir, "savepoints");
 
-		TestingCluster flink = null;
+		final Configuration config = new Configuration();
+		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
+
+		MiniClusterResource cluster = new MiniClusterResource(
+			new MiniClusterResource.MiniClusterResourceConfiguration(
+				config,
+				numTaskManagers,
+				numSlotsPerTaskManager
+			),
+			true);
+		cluster.before();
+		ClusterClient<?> client = cluster.getClusterClient();
 
 		try {
-			// Flink configuration
-			final Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
-			config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
-
-			LOG.info("Flink configuration: " + config + ".");
-
-			// Start Flink
-			flink = new TestingCluster(config);
-			LOG.info("Starting Flink cluster.");
-			flink.start();
-
-			// Retrieve the job manager
-			LOG.info("Retrieving JobManager.");
-			ActorGateway jobManager = Await.result(
-				flink.leaderGateway().future(),
-				deadline.timeLeft());
-			LOG.info("JobManager: " + jobManager + ".");
 
 			// High value to ensure timeouts if restarted.
 			int numberOfRetries = 1000;
@@ -453,15 +252,17 @@ public class SavepointITCase extends TestLogger {
 			LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
 
 			try {
-				flink.submitJobAndWait(jobGraph, false);
+				client.setDetached(false);
+				client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
 			} catch (Exception e) {
-				assertEquals(JobExecutionException.class, e.getClass());
-				assertEquals(FileNotFoundException.class, e.getCause().getClass());
+				Optional<JobExecutionException> expectedJobExecutionException = ExceptionUtils.findThrowable(e, JobExecutionException.class);
+				Optional<FileNotFoundException> expectedFileNotFoundException = ExceptionUtils.findThrowable(e, FileNotFoundException.class);
+				if (!(expectedJobExecutionException.isPresent() && expectedFileNotFoundException.isPresent())) {
+					throw e;
+				}
 			}
 		} finally {
-			if (flink != null) {
-				flink.stop();
-			}
+			cluster.after();
 		}
 	}
 
@@ -480,15 +281,13 @@ public class SavepointITCase extends TestLogger {
 		int parallelism = 2;
 
 		// Test deadline
-		final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
+		final Deadline deadline = Deadline.now().plus(Duration.ofMinutes(5));
 
-		final File tmpDir = folder.getRoot();
+		final File tmpDir = folder.newFolder();
 		final File savepointDir = new File(tmpDir, "savepoints");
 
 		// Flink configuration
 		final Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
 		String savepointPath;
@@ -496,18 +295,18 @@ public class SavepointITCase extends TestLogger {
 		LOG.info("Flink configuration: " + config + ".");
 
 		// Start Flink
-		TestingCluster flink = new TestingCluster(config);
+		MiniClusterResource cluster = new MiniClusterResource(
+			new MiniClusterResource.MiniClusterResourceConfiguration(
+				config,
+				numTaskManagers,
+				numSlotsPerTaskManager
+			),
+			true);
+
+		LOG.info("Shutting down Flink cluster.");
+		cluster.before();
+		ClusterClient<?> client = cluster.getClusterClient();
 		try {
-			LOG.info("Starting Flink cluster.");
-			flink.start(true);
-
-			// Retrieve the job manager
-			LOG.info("Retrieving JobManager.");
-			ActorGateway jobManager = Await.result(
-					flink.leaderGateway().future(),
-					deadline.timeLeft());
-			LOG.info("JobManager: " + jobManager + ".");
-
 			final StatefulCounter statefulCounter = new StatefulCounter();
 			StatefulCounter.resetForTest(parallelism);
 
@@ -536,38 +335,34 @@ public class SavepointITCase extends TestLogger {
 
 			JobGraph originalJobGraph = env.getStreamGraph().getJobGraph();
 
-			JobSubmissionResult submissionResult = flink.submitJobDetached(originalJobGraph);
+			client.setDetached(true);
+			JobSubmissionResult submissionResult = client.submitJob(originalJobGraph, SavepointITCase.class.getClassLoader());
 			JobID jobID = submissionResult.getJobID();
 
 			// wait for the Tasks to be ready
 			StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
-			Future<Object> savepointPathFuture = jobManager.ask(new TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft());
-			savepointPath = ((TriggerSavepointSuccess) Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
-			Future<Object> savepointFuture = jobManager.ask(new RequestSavepoint(savepointPath), deadline.timeLeft());
-
-			((ResponseSavepoint) Await.result(savepointFuture, deadline.timeLeft())).savepoint();
+			savepointPath = client.triggerSavepoint(jobID, null).get();
 			LOG.info("Retrieved savepoint: " + savepointPath + ".");
 		} finally {
 			// Shut down the Flink cluster (thereby canceling the job)
 			LOG.info("Shutting down Flink cluster.");
-			flink.stop();
+			cluster.after();
 		}
 
 		// create a new TestingCluster to make sure we start with completely
 		// new resources
-		flink = new TestingCluster(config);
+		cluster = new MiniClusterResource(
+			new MiniClusterResource.MiniClusterResourceConfiguration(
+				config,
+				numTaskManagers,
+				numSlotsPerTaskManager
+			),
+			true);
+		LOG.info("Restarting Flink cluster.");
+		cluster.before();
+		client = cluster.getClusterClient();
 		try {
-			LOG.info("Restarting Flink cluster.");
-			flink = new TestingCluster(config);
-
-			flink.start(true);
-
-			// Retrieve the job manager
-			LOG.info("Retrieving JobManager.");
-			ActorGateway jobManager = Await.result(flink.leaderGateway().future(), deadline.timeLeft());
-			LOG.info("JobManager: " + jobManager + ".");
-
 			// Reset static test helpers
 			StatefulCounter.resetForTest(parallelism);
 
@@ -598,14 +393,15 @@ public class SavepointITCase extends TestLogger {
 					"savepoint path " + savepointPath + " in detached mode.");
 
 			// Submit the job
-			flink.submitJobDetached(modifiedJobGraph);
+			client.setDetached(true);
+			client.submitJob(modifiedJobGraph, SavepointITCase.class.getClassLoader());
 			// Await state is restored
 			StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
 			// Await some progress after restore
 			StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 		} finally {
-			flink.stop();
+			cluster.after();
 		}
 	}
 
@@ -787,7 +583,6 @@ public class SavepointITCase extends TestLogger {
 		Configuration config = new Configuration();
 		config.addAll(jobGraph.getJobConfiguration());
 		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2 * jobGraph.getMaximumParallelism());
 		final File checkpointDir = new File(tmpDir, "checkpoints");
 		final File savepointDir = new File(tmpDir, "savepoints");
 
@@ -800,31 +595,40 @@ public class SavepointITCase extends TestLogger {
 		config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
 		config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
-		TestingCluster cluster = new TestingCluster(config, false);
+		MiniClusterResource cluster = new MiniClusterResource(
+			new MiniClusterResource.MiniClusterResourceConfiguration(
+				config,
+				1,
+				2 * jobGraph.getMaximumParallelism()
+			),
+			true);
+		cluster.before();
+		ClusterClient<?> client = cluster.getClusterClient();
+
 		String savepointPath = null;
 		try {
-			cluster.start();
-
-			cluster.submitJobDetached(jobGraph);
+			client.setDetached(true);
+			client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
 			for (OneShotLatch latch : iterTestSnapshotWait) {
 				latch.await();
 			}
-			savepointPath = cluster.triggerSavepoint(jobGraph.getJobID());
+			savepointPath = client.triggerSavepoint(jobGraph.getJobID(), null).get();
 			source.cancel();
 
 			jobGraph = streamGraph.getJobGraph();
 			jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-			cluster.submitJobDetached(jobGraph);
+			client.setDetached(true);
+			client.submitJob(jobGraph, SavepointITCase.class.getClassLoader());
 			for (OneShotLatch latch : iterTestRestoreWait) {
 				latch.await();
 			}
 			source.cancel();
 		} finally {
 			if (null != savepointPath) {
-				cluster.disposeSavepoint(savepointPath);
+				client.disposeSavepoint(savepointPath);
 			}
-			cluster.stop();
+			cluster.after();
 		}
 	}
 
@@ -904,4 +708,30 @@ public class SavepointITCase extends TestLogger {
 			}
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	// Utilities
+	// ------------------------------------------------------------------------
+
+	private static class MiniClusterResourceFactory {
+		private final int numTaskManagers;
+		private final int numSlotsPerTaskManager;
+		private final Configuration config;
+
+		private MiniClusterResourceFactory(int numTaskManagers, int numSlotsPerTaskManager, Configuration config) {
+			this.numTaskManagers = numTaskManagers;
+			this.numSlotsPerTaskManager = numSlotsPerTaskManager;
+			this.config = config;
+		}
+
+		MiniClusterResource get() {
+			return new MiniClusterResource(
+				new MiniClusterResource.MiniClusterResourceConfiguration(
+					config,
+					numTaskManagers,
+					numSlotsPerTaskManager
+				),
+				true);
+		}
+	}
 }


[2/9] flink git commit: [FLINK-9180] [conf] Remove REST_ prefix from rest options

Posted by ch...@apache.org.
[FLINK-9180] [conf] Remove REST_ prefix from rest options

This closes #5852.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e0c42ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e0c42ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e0c42ed

Branch: refs/heads/master
Commit: 9e0c42edceb4ae981018a0243669722d060d8f96
Parents: ae0b496
Author: zhangminglei <zm...@163.com>
Authored: Tue Apr 17 10:52:20 2018 +0800
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 09:58:14 2018 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/client/LocalExecutor.java     |  6 +++---
 .../java/org/apache/flink/client/RemoteExecutor.java    |  2 +-
 .../java/org/apache/flink/client/cli/CliFrontend.java   |  4 ++--
 .../client/program/rest/RestClusterClientTest.java      |  4 ++--
 .../org/apache/flink/configuration/RestOptions.java     | 10 +++++-----
 .../org/apache/flink/docs/rest/RestAPIDocGenerator.java |  2 +-
 .../org/apache/flink/api/java/ExecutionEnvironment.java |  4 ++--
 .../flink/runtime/entrypoint/ClusterEntrypoint.java     |  2 +-
 .../highavailability/HighAvailabilityServicesUtils.java |  6 +++---
 .../runtime/minicluster/MiniClusterConfiguration.java   |  4 ++--
 .../flink/runtime/rest/FlinkHttpObjectAggregator.java   |  2 +-
 .../java/org/apache/flink/runtime/rest/RestClient.java  |  2 +-
 .../flink/runtime/rest/RestClientConfiguration.java     |  2 +-
 .../runtime/rest/RestServerEndpointConfiguration.java   | 12 ++++++------
 .../flink/runtime/rest/RestServerEndpointITCase.java    |  8 ++++----
 .../api/environment/LocalStreamEnvironment.java         |  6 +++---
 .../api/environment/StreamExecutionEnvironment.java     |  4 ++--
 .../org/apache/flink/test/util/MiniClusterResource.java |  6 +++---
 .../runtime/BigUserProgramJobSubmitITCase.java          |  2 +-
 .../flink/yarn/AbstractYarnClusterDescriptor.java       |  8 ++++----
 .../flink/yarn/entrypoint/YarnEntrypointUtils.java      |  6 +++---
 21 files changed, 51 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index f209b45..01c281f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -126,8 +126,8 @@ public class LocalExecutor extends PlanExecutor {
 		final JobExecutorService newJobExecutorService;
 		if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
 
-			if (!configuration.contains(RestOptions.REST_PORT)) {
-				configuration.setInteger(RestOptions.REST_PORT, 0);
+			if (!configuration.contains(RestOptions.PORT)) {
+				configuration.setInteger(RestOptions.PORT, 0);
 			}
 
 			final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
@@ -145,7 +145,7 @@ public class LocalExecutor extends PlanExecutor {
 			final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
 			miniCluster.start();
 
-			configuration.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort());
+			configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
 
 			newJobExecutorService = miniCluster;
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index f6242e7..0a2f1b4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -113,7 +113,7 @@ public class RemoteExecutor extends PlanExecutor {
 
 		clientConfiguration.setString(JobManagerOptions.ADDRESS, inet.getHostName());
 		clientConfiguration.setInteger(JobManagerOptions.PORT, inet.getPort());
-		clientConfiguration.setInteger(RestOptions.REST_PORT, inet.getPort());
+		clientConfiguration.setInteger(RestOptions.PORT, inet.getPort());
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 65f470b..7745ca0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -1142,8 +1142,8 @@ public class CliFrontend {
 	public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) {
 		config.setString(JobManagerOptions.ADDRESS, address.getHostString());
 		config.setInteger(JobManagerOptions.PORT, address.getPort());
-		config.setString(RestOptions.REST_ADDRESS, address.getHostString());
-		config.setInteger(RestOptions.REST_PORT, address.getPort());
+		config.setString(RestOptions.ADDRESS, address.getHostString());
+		config.setInteger(RestOptions.PORT, address.getPort());
 	}
 
 	public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index e2daad6..fd05cad 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -692,8 +692,8 @@ public class RestClusterClientTest extends TestLogger {
 
 		configuration.setString(JobManagerOptions.ADDRESS, configuredHostname);
 		configuration.setInteger(JobManagerOptions.PORT, configuredPort);
-		configuration.setString(RestOptions.REST_ADDRESS, configuredHostname);
-		configuration.setInteger(RestOptions.REST_PORT, configuredPort);
+		configuration.setString(RestOptions.ADDRESS, configuredHostname);
+		configuration.setInteger(RestOptions.PORT, configuredPort);
 
 		final DefaultCLI defaultCLI = new DefaultCLI(configuration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index e7421c4..5cbd027 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -31,7 +31,7 @@ public class RestOptions {
 	/**
 	 * The address that the server binds itself to.
 	 */
-	public static final ConfigOption<String> REST_BIND_ADDRESS =
+	public static final ConfigOption<String> BIND_ADDRESS =
 		key("rest.bind-address")
 			.noDefaultValue()
 			.withDescription("The address that the server binds itself.");
@@ -39,7 +39,7 @@ public class RestOptions {
 	/**
 	 * The address that should be used by clients to connect to the server.
 	 */
-	public static final ConfigOption<String> REST_ADDRESS =
+	public static final ConfigOption<String> ADDRESS =
 		key("rest.address")
 			.noDefaultValue()
 			.withDeprecatedKeys(JobManagerOptions.ADDRESS.key())
@@ -48,7 +48,7 @@ public class RestOptions {
 	/**
 	 * The port that the server listens on / the client connects to.
 	 */
-	public static final ConfigOption<Integer> REST_PORT =
+	public static final ConfigOption<Integer> PORT =
 		key("rest.port")
 			.defaultValue(8081)
 			.withDescription("The port that the server listens on / the client connects to.");
@@ -94,7 +94,7 @@ public class RestOptions {
 	/**
 	 * The maximum content length that the server will handle.
 	 */
-	public static final ConfigOption<Integer> REST_SERVER_MAX_CONTENT_LENGTH =
+	public static final ConfigOption<Integer> SERVER_MAX_CONTENT_LENGTH =
 		key("rest.server.max-content-length")
 			.defaultValue(104_857_600)
 			.withDescription("The maximum content length in bytes that the server will handle.");
@@ -102,7 +102,7 @@ public class RestOptions {
 	/**
 	 * The maximum content length that the client will handle.
 	 */
-	public static final ConfigOption<Integer> REST_CLIENT_MAX_CONTENT_LENGTH =
+	public static final ConfigOption<Integer> CLIENT_MAX_CONTENT_LENGTH =
 		key("rest.client.max-content-length")
 			.defaultValue(104_857_600)
 			.withDescription("The maximum content length in bytes that the client will handle.");

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
----------------------------------------------------------------------
diff --git a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
index 5545272..79bf677 100644
--- a/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
+++ b/flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java
@@ -313,7 +313,7 @@ public class RestAPIDocGenerator {
 
 		static {
 			config = new Configuration();
-			config.setString(RestOptions.REST_ADDRESS, "localhost");
+			config.setString(RestOptions.ADDRESS, "localhost");
 			try {
 				restConfig = RestServerEndpointConfiguration.fromConfiguration(config);
 			} catch (ConfigurationException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 3ea99ea..3d858aa 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -1126,9 +1126,9 @@ public abstract class ExecutionEnvironment {
 
 		conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
-		if (!conf.contains(RestOptions.REST_PORT)) {
+		if (!conf.contains(RestOptions.PORT)) {
 			// explicitly set this option so that it's not set to 0 later
-			conf.setInteger(RestOptions.REST_PORT, RestOptions.REST_PORT.defaultValue());
+			conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
 		}
 
 		return createLocalEnvironment(conf, -1);

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 0993cb6..42a3d1a 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -704,7 +704,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		final int restPort = clusterConfiguration.getRestPort();
 
 		if (restPort >= 0) {
-			configuration.setInteger(RestOptions.REST_PORT, restPort);
+			configuration.setInteger(RestOptions.PORT, restPort);
 		}
 
 		return configuration;

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
index f19a421..918f1f0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java
@@ -99,10 +99,10 @@ public class HighAvailabilityServicesUtils {
 					addressResolution,
 					configuration);
 
-				final String address = checkNotNull(configuration.getString(RestOptions.REST_ADDRESS),
+				final String address = checkNotNull(configuration.getString(RestOptions.ADDRESS),
 					"%s must be set",
-					RestOptions.REST_ADDRESS.key());
-				final int port = configuration.getInteger(RestOptions.REST_PORT);
+					RestOptions.ADDRESS.key());
+				final int port = configuration.getInteger(RestOptions.PORT);
 				final boolean enableSSL = configuration.getBoolean(SecurityOptions.SSL_ENABLED);
 				final String protocol = enableSSL ? "https://" : "http://";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index fe76694..44a567b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -169,8 +169,8 @@ public class MiniClusterConfiguration {
 			final Configuration modifiedConfiguration = new Configuration(configuration);
 			modifiedConfiguration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
 			modifiedConfiguration.setString(
-				RestOptions.REST_ADDRESS,
-				modifiedConfiguration.getString(RestOptions.REST_ADDRESS, "localhost"));
+				RestOptions.ADDRESS,
+				modifiedConfiguration.getString(RestOptions.ADDRESS, "localhost"));
 
 			return new MiniClusterConfiguration(
 				modifiedConfiguration,

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java
index 4ee0256..79ad598 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java
@@ -59,7 +59,7 @@ public class FlinkHttpObjectAggregator extends org.apache.flink.shaded.netty4.io
 				false,
 				new ErrorResponseBody(String.format(
 					e.getMessage() + " Try to raise [%s]",
-					RestOptions.REST_SERVER_MAX_CONTENT_LENGTH.key())),
+					RestOptions.SERVER_MAX_CONTENT_LENGTH.key())),
 				HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE,
 				responseHeaders);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index df97f20..8f7dfed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -273,7 +273,7 @@ public class RestClient {
 			if (cause instanceof TooLongFrameException) {
 				jsonFuture.completeExceptionally(new TooLongFrameException(String.format(
 					cause.getMessage() + " Try to raise [%s]",
-					RestOptions.REST_CLIENT_MAX_CONTENT_LENGTH.key())));
+					RestOptions.CLIENT_MAX_CONTENT_LENGTH.key())));
 			} else {
 				jsonFuture.completeExceptionally(cause);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
index 17d4264..0e98e8f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
@@ -107,7 +107,7 @@ public final class RestClientConfiguration {
 
 		final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT);
 
-		int maxContentLength = config.getInteger(RestOptions.REST_CLIENT_MAX_CONTENT_LENGTH);
+		int maxContentLength = config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH);
 
 		return new RestClientConfiguration(sslEngine, connectionTimeout, maxContentLength);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
index 8af76f5..542a937 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
@@ -81,7 +81,7 @@ public final class RestServerEndpointConfiguration {
 	}
 
 	/**
-	 * @see RestOptions#REST_ADDRESS
+	 * @see RestOptions#ADDRESS
 	 */
 	public String getRestAddress() {
 		return restAddress;
@@ -147,12 +147,12 @@ public final class RestServerEndpointConfiguration {
 	public static RestServerEndpointConfiguration fromConfiguration(Configuration config) throws ConfigurationException {
 		Preconditions.checkNotNull(config);
 
-		final String restAddress = Preconditions.checkNotNull(config.getString(RestOptions.REST_ADDRESS),
+		final String restAddress = Preconditions.checkNotNull(config.getString(RestOptions.ADDRESS),
 			"%s must be set",
-			RestOptions.REST_ADDRESS.key());
+			RestOptions.ADDRESS.key());
 
-		final String restBindAddress = config.getString(RestOptions.REST_BIND_ADDRESS);
-		final int port = config.getInteger(RestOptions.REST_PORT);
+		final String restBindAddress = config.getString(RestOptions.BIND_ADDRESS);
+		final int port = config.getInteger(RestOptions.PORT);
 
 		SSLEngine sslEngine = null;
 		final boolean enableSSL = config.getBoolean(SecurityOptions.SSL_ENABLED);
@@ -173,7 +173,7 @@ public final class RestServerEndpointConfiguration {
 			config.getString(WebOptions.UPLOAD_DIR,	config.getString(WebOptions.TMP_DIR)),
 			"flink-web-upload");
 
-		final int maxContentLength = config.getInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH);
+		final int maxContentLength = config.getInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH);
 
 		final Map<String, String> responseHeaders = Collections.singletonMap(
 			HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN,

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 88fdeb8..09e36de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -114,11 +114,11 @@ public class RestServerEndpointITCase extends TestLogger {
 	@Before
 	public void setup() throws Exception {
 		Configuration config = new Configuration();
-		config.setInteger(RestOptions.REST_PORT, 0);
-		config.setString(RestOptions.REST_ADDRESS, "localhost");
+		config.setInteger(RestOptions.PORT, 0);
+		config.setString(RestOptions.ADDRESS, "localhost");
 		config.setString(WebOptions.UPLOAD_DIR, temporaryFolder.newFolder().getCanonicalPath());
-		config.setInteger(RestOptions.REST_SERVER_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH);
-		config.setInteger(RestOptions.REST_CLIENT_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH);
+		config.setInteger(RestOptions.SERVER_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH);
+		config.setInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH, TEST_REST_MAX_CONTENT_LENGTH);
 
 		RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config);
 		RestClientConfiguration clientConfig = RestClientConfiguration.fromConfiguration(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index b9c76b2..8295e3c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -99,8 +99,8 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 		// add (and override) the settings with what the user defined
 		configuration.addAll(this.configuration);
 
-		if (!configuration.contains(RestOptions.REST_PORT)) {
-			configuration.setInteger(RestOptions.REST_PORT, 0);
+		if (!configuration.contains(RestOptions.PORT)) {
+			configuration.setInteger(RestOptions.PORT, 0);
 		}
 
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
@@ -116,7 +116,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 
 		try {
 			miniCluster.start();
-			configuration.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort());
+			configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
 
 			return miniCluster.executeJobBlocking(jobGraph);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 7372fe8..624c938 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1680,9 +1680,9 @@ public abstract class StreamExecutionEnvironment {
 
 		conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
-		if (!conf.contains(RestOptions.REST_PORT)) {
+		if (!conf.contains(RestOptions.PORT)) {
 			// explicitly set this option so that it's not set to 0 later
-			conf.setInteger(RestOptions.REST_PORT, RestOptions.REST_PORT.defaultValue());
+			conf.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue());
 		}
 
 		return createLocalEnvironment(defaultLocalParallelism, conf);

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 531a3c7..324c9ee 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -221,7 +221,7 @@ public class MiniClusterResource extends ExternalResource {
 		}
 
 		// set rest port to 0 to avoid clashes with concurrent MiniClusters
-		configuration.setInteger(RestOptions.REST_PORT, 0);
+		configuration.setInteger(RestOptions.PORT, 0);
 
 		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
 			.setConfiguration(configuration)
@@ -234,7 +234,7 @@ public class MiniClusterResource extends ExternalResource {
 		miniCluster.start();
 
 		// update the port of the rest endpoint
-		configuration.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort());
+		configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
 
 		jobExecutorService = miniCluster;
 		if (enableClusterClient) {
@@ -242,7 +242,7 @@ public class MiniClusterResource extends ExternalResource {
 		}
 		Configuration restClientConfig = new Configuration();
 		restClientConfig.setString(JobManagerOptions.ADDRESS, miniCluster.getRestAddress().getHost());
-		restClientConfig.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort());
+		restClientConfig.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
 		this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
index b10dbec..5fb3e4d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java
@@ -73,7 +73,7 @@ public class BigUserProgramJobSubmitITCase extends TestLogger {
 
 			final Configuration clientConfig = new Configuration();
 			clientConfig.setString(JobManagerOptions.ADDRESS, restAddress.getHost());
-			clientConfig.setInteger(RestOptions.REST_PORT, restAddress.getPort());
+			clientConfig.setInteger(RestOptions.PORT, restAddress.getPort());
 
 			CLIENT = new RestClusterClient<>(
 				clientConfig,

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 8538c1f..aec5fdb 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -382,8 +382,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
 			flinkConfiguration.setInteger(JobManagerOptions.PORT, rpcPort);
 
-			flinkConfiguration.setString(RestOptions.REST_ADDRESS, host);
-			flinkConfiguration.setInteger(RestOptions.REST_PORT, rpcPort);
+			flinkConfiguration.setString(RestOptions.ADDRESS, host);
+			flinkConfiguration.setInteger(RestOptions.PORT, rpcPort);
 
 			return createYarnClusterClient(
 				this,
@@ -542,8 +542,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
 		flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
 
-		flinkConfiguration.setString(RestOptions.REST_ADDRESS, host);
-		flinkConfiguration.setInteger(RestOptions.REST_PORT, port);
+		flinkConfiguration.setString(RestOptions.ADDRESS, host);
+		flinkConfiguration.setInteger(RestOptions.PORT, port);
 
 		// the Flink cluster is deployed in YARN. Represent cluster
 		return createYarnClusterClient(

http://git-wip-us.apache.org/repos/asf/flink/blob/9e0c42ed/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index c50c043..25d138d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -95,7 +95,7 @@ public class YarnEntrypointUtils {
 			ApplicationConstants.Environment.NM_HOST.key());
 
 		configuration.setString(JobManagerOptions.ADDRESS, hostname);
-		configuration.setString(RestOptions.REST_ADDRESS, hostname);
+		configuration.setString(RestOptions.ADDRESS, hostname);
 
 		// TODO: Support port ranges for the AM
 //		final String portRange = configuration.getString(
@@ -115,9 +115,9 @@ public class YarnEntrypointUtils {
 			configuration.setInteger(WebOptions.PORT, 0);
 		}
 
-		if (configuration.getInteger(RestOptions.REST_PORT) >= 0) {
+		if (configuration.getInteger(RestOptions.PORT) >= 0) {
 			// set the REST port to 0 to select it randomly
-			configuration.setInteger(RestOptions.REST_PORT, 0);
+			configuration.setInteger(RestOptions.PORT, 0);
 		}
 
 		// if the user has set the deprecated YARN-specific config keys, we add the


[9/9] flink git commit: [FLINK-9208][tests] fix naming of StreamNetworkThroughputBenchmarkTest

Posted by ch...@apache.org.
[FLINK-9208][tests] fix naming of StreamNetworkThroughputBenchmarkTest

This closes #5873.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/236d2898
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/236d2898
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/236d2898

Branch: refs/heads/master
Commit: 236d28983883ace9a71932a74ce736bec52b9747
Parents: 727370a
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Apr 18 17:04:38 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 09:58:15 2018 +0200

----------------------------------------------------------------------
 .../StreamNetworkThroughputBenchmarkTest.java   | 98 ++++++++++++++++++++
 .../StreamNetworkThroughputBenchmarkTests.java  | 98 --------------------
 2 files changed, 98 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/236d2898/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
new file mode 100644
index 0000000..ba8fe27
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.benchmark;
+
+import org.junit.Test;
+
+/**
+ * Tests for various network benchmarks based on {@link StreamNetworkThroughputBenchmark}.
+ */
+public class StreamNetworkThroughputBenchmarkTest {
+	@Test
+	public void pointToPointBenchmark() throws Exception {
+		StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
+		benchmark.setUp(1, 1, 100);
+		try {
+			benchmark.executeBenchmark(1_000);
+		}
+		finally {
+			benchmark.tearDown();
+		}
+	}
+
+	@Test
+	public void largeLocalMode() throws Exception {
+		StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
+		env.setUp(4, 10, 100, true);
+		env.executeBenchmark(10_000_000);
+		env.tearDown();
+	}
+
+	@Test
+	public void largeRemoteMode() throws Exception {
+		StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
+		env.setUp(4, 10, 100, false);
+		env.executeBenchmark(10_000_000);
+		env.tearDown();
+	}
+
+	@Test
+	public void largeRemoteAlwaysFlush() throws Exception {
+		StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
+		env.setUp(1, 1, 0, false);
+		env.executeBenchmark(1_000_000);
+		env.tearDown();
+	}
+
+	@Test
+	public void pointToMultiPointBenchmark() throws Exception {
+		StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
+		benchmark.setUp(1, 100, 100);
+		try {
+			benchmark.executeBenchmark(1_000);
+		}
+		finally {
+			benchmark.tearDown();
+		}
+	}
+
+	@Test
+	public void multiPointToPointBenchmark() throws Exception {
+		StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
+		benchmark.setUp(4, 1, 100);
+		try {
+			benchmark.executeBenchmark(1_000);
+		}
+		finally {
+			benchmark.tearDown();
+		}
+	}
+
+	@Test
+	public void multiPointToMultiPointBenchmark() throws Exception {
+		StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
+		benchmark.setUp(4, 100, 100);
+		try {
+			benchmark.executeBenchmark(1_000);
+		}
+		finally {
+			benchmark.tearDown();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/236d2898/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
deleted file mode 100644
index a60fa3c..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io.benchmark;
-
-import org.junit.Test;
-
-/**
- * Tests for various network benchmarks based on {@link StreamNetworkThroughputBenchmark}.
- */
-public class StreamNetworkThroughputBenchmarkTests {
-	@Test
-	public void pointToPointBenchmark() throws Exception {
-		StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
-		benchmark.setUp(1, 1, 100);
-		try {
-			benchmark.executeBenchmark(1_000);
-		}
-		finally {
-			benchmark.tearDown();
-		}
-	}
-
-	@Test
-	public void largeLocalMode() throws Exception {
-		StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
-		env.setUp(4, 10, 100, true);
-		env.executeBenchmark(10_000_000);
-		env.tearDown();
-	}
-
-	@Test
-	public void largeRemoteMode() throws Exception {
-		StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
-		env.setUp(4, 10, 100, false);
-		env.executeBenchmark(10_000_000);
-		env.tearDown();
-	}
-
-	@Test
-	public void largeRemoteAlwaysFlush() throws Exception {
-		StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
-		env.setUp(1, 1, 0, false);
-		env.executeBenchmark(1_000_000);
-		env.tearDown();
-	}
-
-	@Test
-	public void pointToMultiPointBenchmark() throws Exception {
-		StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
-		benchmark.setUp(1, 100, 100);
-		try {
-			benchmark.executeBenchmark(1_000);
-		}
-		finally {
-			benchmark.tearDown();
-		}
-	}
-
-	@Test
-	public void multiPointToPointBenchmark() throws Exception {
-		StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
-		benchmark.setUp(4, 1, 100);
-		try {
-			benchmark.executeBenchmark(1_000);
-		}
-		finally {
-			benchmark.tearDown();
-		}
-	}
-
-	@Test
-	public void multiPointToMultiPointBenchmark() throws Exception {
-		StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
-		benchmark.setUp(4, 100, 100);
-		try {
-			benchmark.executeBenchmark(1_000);
-		}
-		finally {
-			benchmark.tearDown();
-		}
-	}
-}


[5/9] flink git commit: [FLINK-8703][tests] Expose WebUI port

Posted by ch...@apache.org.
[FLINK-8703][tests] Expose WebUI port


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31f40360
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31f40360
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31f40360

Branch: refs/heads/master
Commit: 31f4036042a4e4c7cf1ff8a057599e4abccf4f4c
Parents: dca52bf
Author: zentol <ch...@apache.org>
Authored: Wed Mar 7 11:14:20 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 09:58:15 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/test/util/MiniClusterResource.java | 12 ++++++++++++
 1 file changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/31f40360/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
----------------------------------------------------------------------
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 324c9ee..160c1d1 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -75,6 +75,8 @@ public class MiniClusterResource extends ExternalResource {
 
 	private TestEnvironment executionEnvironment;
 
+	private int webUIPort = -1;
+
 	public MiniClusterResource(final MiniClusterResourceConfiguration miniClusterResourceConfiguration) {
 		this(miniClusterResourceConfiguration, false);
 	}
@@ -129,6 +131,10 @@ public class MiniClusterResource extends ExternalResource {
 		return executionEnvironment;
 	}
 
+	public int getWebUIPort() {
+		return webUIPort;
+	}
+
 	@Override
 	public void before() throws Exception {
 
@@ -205,6 +211,10 @@ public class MiniClusterResource extends ExternalResource {
 		Configuration restClientConfig = new Configuration();
 		restClientConfig.setInteger(JobManagerOptions.PORT, flinkMiniCluster.getLeaderRPCPort());
 		this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig);
+
+		if (flinkMiniCluster.webMonitor().isDefined()) {
+			webUIPort = flinkMiniCluster.webMonitor().get().getServerPort();
+		}
 	}
 
 	private void startMiniCluster() throws Exception {
@@ -244,6 +254,8 @@ public class MiniClusterResource extends ExternalResource {
 		restClientConfig.setString(JobManagerOptions.ADDRESS, miniCluster.getRestAddress().getHost());
 		restClientConfig.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
 		this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig);
+
+		webUIPort = miniCluster.getRestAddress().getPort();
 	}
 
 	/**


[7/9] flink git commit: [FLINK-9206][checkpoints] add job IDs to CheckpointCoordinator log messages This closes #5872.

Posted by ch...@apache.org.
[FLINK-9206][checkpoints] add job IDs to CheckpointCoordinator log messages  This closes #5872.

This closes #5873.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/727370aa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/727370aa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/727370aa

Branch: refs/heads/master
Commit: 727370aacf63cefc6aed7c46dc2d63517e4b708d
Parents: b0ee05e
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Wed Apr 18 15:01:32 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 09:58:15 2018 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 57 +++++++++++---------
 1 file changed, 31 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/727370aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 59916fd..4ddac003 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -316,7 +316,7 @@ public class CheckpointCoordinator {
 		synchronized (lock) {
 			if (!shutdown) {
 				shutdown = true;
-				LOG.info("Stopping checkpoint coordinator for job " + job);
+				LOG.info("Stopping checkpoint coordinator for job {}.", job);
 
 				periodicScheduling = false;
 				triggerRequestQueued = false;
@@ -414,7 +414,7 @@ public class CheckpointCoordinator {
 			if (!props.forceCheckpoint()) {
 				// sanity check: there should never be more than one trigger request queued
 				if (triggerRequestQueued) {
-					LOG.warn("Trying to trigger another checkpoint while one was queued already");
+					LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
 					return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
 				}
 
@@ -455,8 +455,9 @@ public class CheckpointCoordinator {
 			if (ee != null && ee.getState() == ExecutionState.RUNNING) {
 				executions[i] = ee;
 			} else {
-				LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
-						tasksToTrigger[i].getTaskNameWithSubtaskIndex());
+				LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
+						tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
+						job);
 				return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
 			}
 		}
@@ -470,8 +471,9 @@ public class CheckpointCoordinator {
 			if (ee != null) {
 				ackTasks.put(ee.getAttemptId(), ev);
 			} else {
-				LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
-						ev.getTaskNameWithSubtaskIndex());
+				LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
+						ev.getTaskNameWithSubtaskIndex(),
+						job);
 				return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
 			}
 		}
@@ -498,7 +500,10 @@ public class CheckpointCoordinator {
 			}
 			catch (Throwable t) {
 				int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
-				LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
+				LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",
+						job,
+						numUnsuccessful,
+						t);
 				return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
 			}
 
@@ -526,7 +531,7 @@ public class CheckpointCoordinator {
 					// only do the work if the checkpoint is not discarded anyways
 					// note that checkpoint completion discards the pending checkpoint object
 					if (!checkpoint.isDiscarded()) {
-						LOG.info("Checkpoint " + checkpointID + " expired before completing.");
+						LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);
 
 						checkpoint.abortExpired();
 						pendingCheckpoints.remove(checkpointID);
@@ -547,7 +552,7 @@ public class CheckpointCoordinator {
 					}
 					else if (!props.forceCheckpoint()) {
 						if (triggerRequestQueued) {
-							LOG.warn("Trying to trigger another checkpoint while one was queued already");
+							LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
 							return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
 						}
 
@@ -579,7 +584,7 @@ public class CheckpointCoordinator {
 						}
 					}
 
-					LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
+					LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
 
 					pendingCheckpoints.put(checkpointID, checkpoint);
 
@@ -620,8 +625,8 @@ public class CheckpointCoordinator {
 				}
 
 				int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
-				LOG.warn("Failed to trigger checkpoint {}. ({} consecutive failed attempts so far)",
-						checkpointID, numUnsuccessful, t);
+				LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
+						checkpointID, job, numUnsuccessful, t);
 
 				if (!checkpoint.isDiscarded()) {
 					checkpoint.abortError(new Exception("Failed to trigger checkpoint", t));
@@ -673,7 +678,7 @@ public class CheckpointCoordinator {
 			checkpoint = pendingCheckpoints.remove(checkpointId);
 
 			if (checkpoint != null && !checkpoint.isDiscarded()) {
-				LOG.info("Decline checkpoint {} by task {}.", checkpointId, message.getTaskExecutionId());
+				LOG.info("Decline checkpoint {} by task {} of job {}.", checkpointId, message.getTaskExecutionId(), job);
 				discardCheckpoint(checkpoint, message.getReason());
 			}
 			else if (checkpoint != null) {
@@ -684,12 +689,12 @@ public class CheckpointCoordinator {
 			else if (LOG.isDebugEnabled()) {
 				if (recentPendingCheckpoints.contains(checkpointId)) {
 					// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
-					LOG.debug("Received another decline message for now expired checkpoint attempt {} : {}",
-							checkpointId, reason);
+					LOG.debug("Received another decline message for now expired checkpoint attempt {} of job {} : {}",
+							checkpointId, job, reason);
 				} else {
 					// message is for an unknown checkpoint. might be so old that we don't even remember it any more
-					LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} : {}",
-							checkpointId, reason);
+					LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} of job {} : {}",
+							checkpointId, job, reason);
 				}
 			}
 		}
@@ -834,7 +839,7 @@ public class CheckpointCoordinator {
 							try {
 								completedCheckpoint.discardOnFailedStoring();
 							} catch (Throwable t) {
-								LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);
+								LOG.warn("Could not properly discard completed checkpoint {} of job {}.", completedCheckpoint.getCheckpointID(), job, t);
 							}
 						}
 					});
@@ -857,7 +862,7 @@ public class CheckpointCoordinator {
 		// the 'min delay between checkpoints'
 		lastCheckpointCompletionNanos = System.nanoTime();
 
-		LOG.info("Completed checkpoint {} ({} bytes in {} ms).", checkpointId,
+		LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,
 			completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());
 
 		if (LOG.isDebugEnabled()) {
@@ -1007,7 +1012,7 @@ public class CheckpointCoordinator {
 				completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
 			}
 
-			LOG.debug("Status of the shared state registry after restore: {}.", sharedStateRegistry);
+			LOG.debug("Status of the shared state registry of job {} after restore: {}.", job, sharedStateRegistry);
 
 			// Restore from the latest checkpoint
 			CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
@@ -1020,7 +1025,7 @@ public class CheckpointCoordinator {
 				}
 			}
 
-			LOG.info("Restoring from latest valid checkpoint: {}.", latest);
+			LOG.info("Restoring job {} from latest valid checkpoint: {}.", job, latest);
 
 			// re-assign the task states
 			final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();
@@ -1076,8 +1081,8 @@ public class CheckpointCoordinator {
 
 		Preconditions.checkNotNull(savepointPointer, "The savepoint path cannot be null.");
 
-		LOG.info("Starting job from savepoint {} ({})",
-				savepointPointer, (allowNonRestored ? "allowing non restored state" : ""));
+		LOG.info("Starting job {} from savepoint {} ({})",
+				job, savepointPointer, (allowNonRestored ? "allowing non restored state" : ""));
 
 		final CompletedCheckpointStorageLocation checkpointLocation = checkpointStorage.resolveCheckpoint(savepointPointer);
 
@@ -1091,7 +1096,7 @@ public class CheckpointCoordinator {
 		long nextCheckpointId = savepoint.getCheckpointID() + 1;
 		checkpointIdCounter.setCount(nextCheckpointId);
 
-		LOG.info("Reset the checkpoint ID to {}.", nextCheckpointId);
+		LOG.info("Reset the checkpoint ID of job {} to {}.", job, nextCheckpointId);
 
 		return restoreLatestCheckpointedState(tasks, true, allowNonRestored);
 	}
@@ -1214,7 +1219,7 @@ public class CheckpointCoordinator {
 				triggerCheckpoint(System.currentTimeMillis(), true);
 			}
 			catch (Exception e) {
-				LOG.error("Exception while triggering checkpoint.", e);
+				LOG.error("Exception while triggering checkpoint for job {}.", job, e);
 			}
 		}
 	}
@@ -1233,7 +1238,7 @@ public class CheckpointCoordinator {
 
 		final String reason = (cause != null) ? cause.getMessage() : "";
 
-		LOG.info("Discarding checkpoint {} because: {}", checkpointId, reason);
+		LOG.info("Discarding checkpoint {} of job {} because: {}", checkpointId, job, reason);
 
 		pendingCheckpoint.abortDeclined();
 		rememberRecentCheckpointId(checkpointId);


[4/9] flink git commit: [FLINK-9199][REST] Fix URLs and remove subtask index parameter

Posted by ch...@apache.org.
[FLINK-9199][REST] Fix URLs and remove subtask index parameter

This closes #5865.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dca52bf7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dca52bf7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dca52bf7

Branch: refs/heads/master
Commit: dca52bf783744f98c2b8c40480248d5f11b91171
Parents: 469cab4
Author: Rong Rong <ro...@uber.com>
Authored: Tue Apr 17 23:44:02 2018 -0700
Committer: zentol <ch...@apache.org>
Committed: Thu Apr 19 09:58:15 2018 +0200

----------------------------------------------------------------------
 docs/_includes/generated/rest_dispatcher.html   | 181 ++++++++++++++++++-
 ...taskExecutionAttemptAccumulatorsHeaders.java |   2 +-
 .../SubtaskExecutionAttemptDetailsHeaders.java  |   2 +-
 .../AggregatedSubtaskMetricsHeaders.java        |   2 +-
 .../AggregatedSubtaskMetricsParameters.java     |   5 +-
 5 files changed, 175 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dca52bf7/docs/_includes/generated/rest_dispatcher.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/rest_dispatcher.html b/docs/_includes/generated/rest_dispatcher.html
index 5889f88..8ac36e1 100644
--- a/docs/_includes/generated/rest_dispatcher.html
+++ b/docs/_includes/generated/rest_dispatcher.html
@@ -311,6 +311,56 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
+      <td class="text-left" colspan="2"><strong>/jobs/metrics</strong></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">description</td>
+    </tr>
+    <tr>
+      <td colspan="2">Query parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>get</code> (optional): description</li>
+<li><code>agg</code> (optional): description</li>
+<li><code>jobs</code> (optional): description</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <button data-toggle="collapse" data-target="#-790902571">Request</button>
+        <div id="-790902571" class="collapse">
+          <pre>
+            <code>
+{}            </code>
+          </pre>
+         </div>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <button data-toggle="collapse" data-target="#128306770">Response</button>
+        <div id="128306770" class="collapse">
+          <pre>
+            <code>
+{
+  "type" : "any"
+}            </code>
+          </pre>
+         </div>
+      </td>
+    </tr>
+  </tbody>
+</table>
+<table class="table table-bordered">
+  <tbody>
+    <tr>
       <td class="text-left" colspan="2"><strong>/jobs/overview</strong></td>
     </tr>
     <tr>
@@ -2208,6 +2258,67 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/metrics</strong></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">description</td>
+    </tr>
+    <tr>
+      <td colspan="2">Path parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>jobid</code> - description</li>
+<li><code>vertexid</code> - description</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">Query parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>get</code> (optional): description</li>
+<li><code>agg</code> (optional): description</li>
+<li><code>subtasks</code> (optional): description</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <button data-toggle="collapse" data-target="#265134755">Request</button>
+        <div id="265134755" class="collapse">
+          <pre>
+            <code>
+{}            </code>
+          </pre>
+         </div>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <button data-toggle="collapse" data-target="#1184344096">Response</button>
+        <div id="1184344096" class="collapse">
+          <pre>
+            <code>
+{
+  "type" : "any"
+}            </code>
+          </pre>
+         </div>
+      </td>
+    </tr>
+  </tbody>
+</table>
+<table class="table table-bordered">
+  <tbody>
+    <tr>
       <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex</strong></td>
     </tr>
     <tr>
@@ -2313,7 +2424,7 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/attempts/attempt</strong></td>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/attempts/:attempt</strong></td>
     </tr>
     <tr>
       <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -2337,8 +2448,8 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#1633948804">Request</button>
-        <div id="1633948804" class="collapse">
+        <button data-toggle="collapse" data-target="#168850740">Request</button>
+        <div id="168850740" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -2348,8 +2459,8 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#-384785209">Response</button>
-        <div id="-384785209" class="collapse">
+        <button data-toggle="collapse" data-target="#-1849883273">Response</button>
+        <div id="-1849883273" class="collapse">
           <pre>
             <code>
 {
@@ -2419,7 +2530,7 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
-      <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/attempts/attempt/accumulators</strong></td>
+      <td class="text-left" colspan="2"><strong>/jobs/:jobid/vertices/:vertexid/subtasks/:subtaskindex/attempts/:attempt/accumulators</strong></td>
     </tr>
     <tr>
       <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
@@ -2443,8 +2554,8 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#1869002000">Request</button>
-        <div id="1869002000" class="collapse">
+        <button data-toggle="collapse" data-target="#-1303317920">Request</button>
+        <div id="-1303317920" class="collapse">
           <pre>
             <code>
 {}            </code>
@@ -2454,8 +2565,8 @@
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" data-target="#1717045676">Response</button>
-        <div id="1717045676" class="collapse">
+        <button data-toggle="collapse" data-target="#-1455274244">Response</button>
+        <div id="-1455274244" class="collapse">
           <pre>
             <code>
 {
@@ -3038,6 +3149,56 @@
 <table class="table table-bordered">
   <tbody>
     <tr>
+      <td class="text-left" colspan="2"><strong>/taskmanagers/metrics</strong></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">description</td>
+    </tr>
+    <tr>
+      <td colspan="2">Query parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>get</code> (optional): description</li>
+<li><code>agg</code> (optional): description</li>
+<li><code>taskmanagers</code> (optional): description</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <button data-toggle="collapse" data-target="#521351018">Request</button>
+        <div id="521351018" class="collapse">
+          <pre>
+            <code>
+{}            </code>
+          </pre>
+         </div>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <button data-toggle="collapse" data-target="#1440560359">Response</button>
+        <div id="1440560359" class="collapse">
+          <pre>
+            <code>
+{
+  "type" : "any"
+}            </code>
+          </pre>
+         </div>
+      </td>
+    </tr>
+  </tbody>
+</table>
+<table class="table table-bordered">
+  <tbody>
+    <tr>
       <td class="text-left" colspan="2"><strong>/taskmanagers/:taskmanagerid</strong></td>
     </tr>
     <tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/dca52bf7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java
index 662e87c..5cc159c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptAccumulatorsHeaders.java
@@ -36,7 +36,7 @@ public class SubtaskExecutionAttemptAccumulatorsHeaders implements MessageHeader
 	private static final SubtaskExecutionAttemptAccumulatorsHeaders INSTANCE = new SubtaskExecutionAttemptAccumulatorsHeaders();
 
 	public static final String URL = String.format(
-		"/jobs/:%s/vertices/:%s/subtasks/:%s/attempts/%s/accumulators",
+		"/jobs/:%s/vertices/:%s/subtasks/:%s/attempts/:%s/accumulators",
 		JobIDPathParameter.KEY,
 		JobVertexIdPathParameter.KEY,
 		SubtaskIndexPathParameter.KEY,

http://git-wip-us.apache.org/repos/asf/flink/blob/dca52bf7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java
index aa65007..6f8eb21 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java
@@ -36,7 +36,7 @@ public class SubtaskExecutionAttemptDetailsHeaders implements MessageHeaders<Emp
 	private static final SubtaskExecutionAttemptDetailsHeaders INSTANCE = new SubtaskExecutionAttemptDetailsHeaders();
 
 	public static final String URL = String.format(
-		"/jobs/:%s/vertices/:%s/subtasks/:%s/attempts/%s",
+		"/jobs/:%s/vertices/:%s/subtasks/:%s/attempts/:%s",
 		JobIDPathParameter.KEY,
 		JobVertexIdPathParameter.KEY,
 		SubtaskIndexPathParameter.KEY,

http://git-wip-us.apache.org/repos/asf/flink/blob/dca52bf7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java
index e1d0790..bfeab5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsHeaders.java
@@ -38,7 +38,7 @@ public class AggregatedSubtaskMetricsHeaders extends AbstractAggregatedMetricsHe
 
 	@Override
 	public String getTargetRestEndpointURL() {
-		return "/jobs/" + JobIDPathParameter.KEY + "/vertices/" + JobVertexIdPathParameter.KEY + "/subtasks/metrics";
+		return "/jobs/:" + JobIDPathParameter.KEY + "/vertices/:" + JobVertexIdPathParameter.KEY + "/subtasks/metrics";
 	}
 
 	public static AggregatedSubtaskMetricsHeaders getInstance() {

http://git-wip-us.apache.org/repos/asf/flink/blob/dca52bf7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java
index 34e1b52..f3969b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/AggregatedSubtaskMetricsParameters.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.rest.messages.job.metrics;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
-import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -34,7 +33,6 @@ public class AggregatedSubtaskMetricsParameters extends AbstractAggregatedMetric
 
 	private final JobIDPathParameter jobId = new JobIDPathParameter();
 	private final JobVertexIdPathParameter vertexId = new JobVertexIdPathParameter();
-	private final SubtaskIndexPathParameter subtaskIndex = new SubtaskIndexPathParameter();
 
 	public AggregatedSubtaskMetricsParameters() {
 		super(new SubtasksFilterQueryParameter());
@@ -44,8 +42,7 @@ public class AggregatedSubtaskMetricsParameters extends AbstractAggregatedMetric
 	public Collection<MessagePathParameter<?>> getPathParameters() {
 		return Collections.unmodifiableCollection(Arrays.asList(
 			jobId,
-			vertexId,
-			subtaskIndex
+			vertexId
 		));
 	}
 }