You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/11 16:15:00 UTC

[1/5] flink git commit: [FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of YarnClusterClient

Repository: flink
Updated Branches:
  refs/heads/master 63343fb8e -> d7ee60330


[FLINK-8328] [flip6] Move Yarn ApplicationStatus polling out of YarnClusterClient

Introduce YarnApplicationStatusMonitor which does the Yarn ApplicationStatus polling in
the FlinkYarnSessionCli. This decouples the YarnClusterClient from the actual communication
with Yarn and, thus, gives a better separation of concerns.

This closes #5215.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ce5b98d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ce5b98d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ce5b98d

Branch: refs/heads/master
Commit: 2ce5b98da04cb3850ff91757cc4b74a98b8ce082
Parents: 63343fb
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Dec 7 13:57:24 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 11 16:59:19 2018 +0100

----------------------------------------------------------------------
 ...CliFrontendYarnAddressConfigurationTest.java |   2 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |   2 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |   2 +-
 .../apache/flink/yarn/YarnClusterClient.java    | 110 +------
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 283 ++++++++++++-------
 .../yarn/cli/YarnApplicationStatusMonitor.java  | 101 +++++++
 6 files changed, 287 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2ce5b98d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 1fed554..1b457a5 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -379,7 +379,7 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 				}
 
 				@Override
-				protected YarnClient getYarnClient() {
+				public YarnClient getYarnClient() {
 					return new TestYarnClient();
 				}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2ce5b98d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index ae39d0a..e0299aa 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -656,7 +656,7 @@ public abstract class YarnTestBase extends TestLogger {
 			throw new RuntimeException("Runner failed", runner.getRunnerError());
 		}
 		Assert.assertTrue("During the timeout period of " + startTimeoutSeconds + " seconds the " +
-				"expected string did not show up", expectedStringSeen);
+				"expected string \"" + terminateAfterString + "\" did not show up.", expectedStringSeen);
 
 		LOG.info("Test was successful");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ce5b98d/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index a910148..86ddd9b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -308,7 +308,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	 * Gets a Hadoop Yarn client.
 	 * @return Returns a YarnClient which has to be shutdown manually
 	 */
-	protected YarnClient getYarnClient() {
+	public YarnClient getYarnClient() {
 		YarnClient yarnClient = YarnClient.createYarnClient();
 		yarnClient.init(conf);
 		yarnClient.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/2ce5b98d/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index ceca29d..80d0943 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -40,7 +40,6 @@ import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
-import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -68,12 +67,9 @@ public class YarnClusterClient extends ClusterClient {
 
 	private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClient.class);
 
-	private static final int POLLING_THREAD_INTERVAL_MS = 1000;
-
 	private YarnClient yarnClient;
 
 	private Thread clientShutdownHook = new ClientShutdownHook();
-	private PollingThread pollingRunner;
 
 	//---------- Class internal fields -------------------
 
@@ -130,10 +126,6 @@ public class YarnClusterClient extends ClusterClient {
 			actorSystemLoader,
 			highAvailabilityServices);
 
-		this.pollingRunner = new PollingThread(yarnClient, appId);
-		this.pollingRunner.setDaemon(true);
-		this.pollingRunner.start();
-
 		Runtime.getRuntime().addShutdownHook(clientShutdownHook);
 	}
 
@@ -158,14 +150,6 @@ public class YarnClusterClient extends ClusterClient {
 			// we are already in the shutdown hook
 		}
 
-		try {
-			pollingRunner.stopRunner();
-			pollingRunner.join(1000);
-		} catch (InterruptedException e) {
-			LOG.warn("Shutdown of the polling runner was interrupted", e);
-			Thread.currentThread().interrupt();
-		}
-
 		isConnected = false;
 	}
 
@@ -254,34 +238,6 @@ public class YarnClusterClient extends ClusterClient {
 		}
 	}
 
-	public ApplicationStatus getApplicationStatus() {
-		if (!isConnected) {
-			throw new IllegalStateException("The cluster has been connected to the ApplicationMaster.");
-		}
-		ApplicationReport lastReport = null;
-		if (pollingRunner == null) {
-			LOG.warn("YarnClusterClient.getApplicationStatus() has been called on an uninitialized cluster." +
-					"The system might be in an erroneous state");
-		} else {
-			lastReport = pollingRunner.getLastReport();
-		}
-		if (lastReport == null) {
-			LOG.warn("YarnClusterClient.getApplicationStatus() has been called on a cluster that didn't receive a status so far." +
-					"The system might be in an erroneous state");
-			return ApplicationStatus.UNKNOWN;
-		} else {
-			YarnApplicationState appState = lastReport.getYarnApplicationState();
-			ApplicationStatus status =
-				(appState == YarnApplicationState.FAILED || appState == YarnApplicationState.KILLED) ?
-					ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED;
-			if (status != ApplicationStatus.SUCCEEDED) {
-				LOG.warn("YARN reported application state {}", appState);
-				LOG.warn("Diagnostics: {}", lastReport.getDiagnostics());
-			}
-			return status;
-		}
-	}
-
 	@Override
 	public List<String> getNewMessages() {
 
@@ -371,7 +327,7 @@ public class YarnClusterClient extends ClusterClient {
 		try {
 			Future<Object> response =
 				Patterns.ask(applicationClient.get(),
-					new YarnMessages.LocalStopYarnSession(getApplicationStatus(),
+					new YarnMessages.LocalStopYarnSession(ApplicationStatus.CANCELED,
 							"Flink YARN Client requested shutdown"),
 					new Timeout(akkaDuration));
 			Await.ready(response, akkaDuration);
@@ -393,14 +349,6 @@ public class YarnClusterClient extends ClusterClient {
 		}
 
 		try {
-			pollingRunner.stopRunner();
-			pollingRunner.join(1000);
-		} catch (InterruptedException e) {
-			LOG.warn("Shutdown of the polling runner was interrupted", e);
-			Thread.currentThread().interrupt();
-		}
-
-		try {
 			ApplicationReport appReport = yarnClient.getApplicationReport(appId);
 
 			LOG.info("Application " + appId + " finished with state " + appReport
@@ -443,62 +391,6 @@ public class YarnClusterClient extends ClusterClient {
 		}
 	}
 
-	// -------------------------- Polling ------------------------
-
-	private static class PollingThread extends Thread {
-
-		AtomicBoolean running = new AtomicBoolean(true);
-		private YarnClient yarnClient;
-		private ApplicationId appId;
-
-		// ------- status information stored in the polling thread
-		private final Object lock = new Object();
-		private ApplicationReport lastReport;
-
-		public PollingThread(YarnClient yarnClient, ApplicationId appId) {
-			this.yarnClient = yarnClient;
-			this.appId = appId;
-		}
-
-		public void stopRunner() {
-			if (!running.get()) {
-				LOG.warn("Polling thread was already stopped");
-			}
-			running.set(false);
-		}
-
-		public ApplicationReport getLastReport() {
-			synchronized (lock) {
-				return lastReport;
-			}
-		}
-
-		@Override
-		public void run() {
-			while (running.get() && yarnClient.isInState(Service.STATE.STARTED)) {
-				try {
-					ApplicationReport report = yarnClient.getApplicationReport(appId);
-					synchronized (lock) {
-						lastReport = report;
-					}
-				} catch (Exception e) {
-					LOG.warn("Error while getting application report", e);
-				}
-				try {
-					Thread.sleep(YarnClusterClient.POLLING_THREAD_INTERVAL_MS);
-				} catch (InterruptedException e) {
-					LOG.error("Polling thread got interrupted", e);
-					Thread.currentThread().interrupt(); // pass interrupt.
-					stopRunner();
-				}
-			}
-			if (running.get() && !yarnClient.isInState(Service.STATE.STARTED)) {
-				// == if the polling thread is still running but the yarn client is stopped.
-				LOG.warn("YARN client is unexpected in state " + yarnClient.getServiceState());
-			}
-		}
-	}
-
 	@Override
 	public boolean isDetached() {
 		return super.isDetached() || clusterDescriptor.isDetachedMode();

http://git-wip-us.apache.org/repos/asf/flink/blob/2ce5b98d/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index df4ef1f..5483758 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -31,8 +31,10 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
@@ -71,6 +73,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
 import static org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_ID;
@@ -86,7 +90,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
 	public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";
 
-	private static final int CLIENT_POLLING_INTERVALL = 3;
+	private static final long CLIENT_POLLING_INTERVAL_MS = 3000L;
 
 	/** The id for the CommandLine interface. */
 	private static final String ID = "yarn-cluster";
@@ -99,6 +103,10 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 	private static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; // this has to be a regex for String.split()
 
+	private static final String YARN_SESSION_HELP = "Available commands:\n" +
+		"help - show these commands\n" +
+		"stop - stop the YARN session";
+
 	//------------------------------------ Command Line argument options -------------------------
 	// the prefix transformation is used by the CliFrontend static constructor.
 	private final Option query;
@@ -419,104 +427,6 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		formatter.printHelp(" ", options);
 	}
 
-	private static void writeYarnProperties(Properties properties, File propertiesFile) {
-		try (final OutputStream out = new FileOutputStream(propertiesFile)) {
-			properties.store(out, "Generated YARN properties file");
-		} catch (IOException e) {
-			throw new RuntimeException("Error writing the properties file", e);
-		}
-		propertiesFile.setReadable(true, false); // readable for all.
-	}
-
-	public static void runInteractiveCli(YarnClusterClient yarnCluster, boolean readConsoleInput) {
-		final String help = "Available commands:\n" +
-				"help - show these commands\n" +
-				"stop - stop the YARN session";
-		int numTaskmanagers = 0;
-		try {
-			BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
-			label:
-			while (true) {
-				// ------------------ check if there are updates by the cluster -----------
-
-				try {
-					GetClusterStatusResponse status = yarnCluster.getClusterStatus();
-					LOG.debug("Received status message: {}", status);
-
-					if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) {
-						System.err.println("Number of connected TaskManagers changed to " +
-							status.numRegisteredTaskManagers() + ". " +
-							"Slots available: " + status.totalNumberOfSlots());
-						numTaskmanagers = status.numRegisteredTaskManagers();
-					}
-				} catch (Exception e) {
-					LOG.warn("Could not retrieve the current cluster status. Skipping current retrieval attempt ...", e);
-				}
-
-				List<String> messages = yarnCluster.getNewMessages();
-				if (messages != null && messages.size() > 0) {
-					System.err.println("New messages from the YARN cluster: ");
-					for (String msg : messages) {
-						System.err.println(msg);
-					}
-				}
-
-				if (yarnCluster.getApplicationStatus() != ApplicationStatus.SUCCEEDED) {
-					System.err.println("The YARN cluster has failed");
-					yarnCluster.shutdown();
-				}
-
-				// wait until CLIENT_POLLING_INTERVAL is over or the user entered something.
-				long startTime = System.currentTimeMillis();
-				while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVALL * 1000
-						&& (!readConsoleInput || !in.ready())) {
-					Thread.sleep(200);
-				}
-				//------------- handle interactive command by user. ----------------------
-
-				if (readConsoleInput && in.ready()) {
-					String command = in.readLine();
-					switch (command) {
-						case "quit":
-						case "stop":
-							yarnCluster.shutdownCluster();
-							break label;
-
-						case "help":
-							System.err.println(help);
-							break;
-						default:
-							System.err.println("Unknown command '" + command + "'. Showing help: \n" + help);
-							break;
-					}
-				}
-
-				if (yarnCluster.hasBeenShutdown()) {
-					LOG.info("Stopping interactive command line interface, YARN cluster has been stopped.");
-					break;
-				}
-			}
-		} catch (Exception e) {
-			LOG.warn("Exception while running the interactive command line interface", e);
-		}
-	}
-
-	public static void main(final String[] args) throws Exception {
-		final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
-
-		final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
-
-		final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
-		SecurityUtils.install(new SecurityConfiguration(flinkConfiguration));
-		int retCode = SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
-			@Override
-			public Integer call() {
-				return cli.run(args, flinkConfiguration, configurationDirectory);
-			}
-		});
-		System.exit(retCode);
-	}
-
 	@Override
 	public boolean isActive(CommandLine commandLine, Configuration configuration) {
 		String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
@@ -660,7 +570,23 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 					"yarn application -kill " + applicationId.getOpt());
 				yarnCluster.disconnect();
 			} else {
-				runInteractiveCli(yarnCluster, true);
+				ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+				try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
+						yarnDescriptor.getYarnClient(),
+						yarnCluster.getApplicationId(),
+						new ScheduledExecutorServiceAdapter(scheduledExecutorService))) {
+					runInteractiveCli(
+						yarnCluster,
+						yarnApplicationStatusMonitor,
+						true);
+				} finally {
+					// shut down the scheduled executor service
+					ExecutorUtils.gracefulShutdown(
+						1000L,
+						TimeUnit.MILLISECONDS,
+						scheduledExecutorService);
+				}
 			}
 		} else {
 
@@ -717,7 +643,24 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 				yarnCluster.waitForClusterToBeReady();
 				yarnCluster.disconnect();
 			} else {
-				runInteractiveCli(yarnCluster, acceptInteractiveInput);
+
+				ScheduledThreadPoolExecutor scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+				try (YarnApplicationStatusMonitor yarnApplicationStatusMonitor = new YarnApplicationStatusMonitor(
+						yarnDescriptor.getYarnClient(),
+						yarnCluster.getApplicationId(),
+						new ScheduledExecutorServiceAdapter(scheduledExecutorService))){
+					runInteractiveCli(
+						yarnCluster,
+						yarnApplicationStatusMonitor,
+						acceptInteractiveInput);
+				} finally {
+					// shut down the scheduled executor service
+					ExecutorUtils.gracefulShutdown(
+						1000L,
+						TimeUnit.MILLISECONDS,
+						scheduledExecutorService);
+				}
 			}
 		}
 		return 0;
@@ -743,6 +686,144 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		System.out.println(message);
 	}
 
+	public static void main(final String[] args) throws Exception {
+		final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
+
+		final String configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
+
+		final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration();
+		SecurityUtils.install(new SecurityConfiguration(flinkConfiguration));
+		int retCode = SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
+			@Override
+			public Integer call() {
+				return cli.run(args, flinkConfiguration, configurationDirectory);
+			}
+		});
+		System.exit(retCode);
+	}
+
+	private static void runInteractiveCli(
+		YarnClusterClient clusterClient,
+		YarnApplicationStatusMonitor yarnApplicationStatusMonitor,
+		boolean readConsoleInput) {
+		try (BufferedReader in = new BufferedReader(new InputStreamReader(System.in))) {
+			boolean continueRepl = true;
+			int numTaskmanagers = 0;
+			boolean isLastStatusUnknown = true;
+			long unknownStatusSince = System.nanoTime();
+
+			while (continueRepl) {
+
+				final ApplicationStatus applicationStatus = yarnApplicationStatusMonitor.getApplicationStatusNow();
+
+				switch (applicationStatus) {
+					case FAILED:
+					case CANCELED:
+						System.err.println("The Flink Yarn cluster has failed.");
+						continueRepl = false;
+						break;
+					case UNKNOWN:
+						if (!isLastStatusUnknown) {
+							unknownStatusSince = System.nanoTime();
+							isLastStatusUnknown = true;
+						}
+
+						if ((System.nanoTime() - unknownStatusSince) > 5L * CLIENT_POLLING_INTERVAL_MS * 1_000_000L) {
+							System.err.println("The Flink Yarn cluster is in an unknown state. Please check the Yarn cluster.");
+							continueRepl = false;
+						} else {
+							continueRepl = repStep(in, readConsoleInput);
+						}
+						break;
+					case SUCCEEDED:
+						if (isLastStatusUnknown) {
+							isLastStatusUnknown = false;
+						}
+
+						// ------------------ check if there are updates by the cluster -----------
+						try {
+							final GetClusterStatusResponse status = clusterClient.getClusterStatus();
+
+							if (status != null && numTaskmanagers != status.numRegisteredTaskManagers()) {
+								System.err.println("Number of connected TaskManagers changed to " +
+									status.numRegisteredTaskManagers() + ". " +
+									"Slots available: " + status.totalNumberOfSlots());
+								numTaskmanagers = status.numRegisteredTaskManagers();
+							}
+						} catch (Exception e) {
+							LOG.warn("Could not retrieve the current cluster status. Skipping current retrieval attempt ...", e);
+						}
+
+						printClusterMessages(clusterClient);
+
+						continueRepl = repStep(in, readConsoleInput);
+				}
+			}
+		} catch (Exception e) {
+			LOG.warn("Exception while running the interactive command line interface.", e);
+		}
+	}
+
+	private static void printClusterMessages(YarnClusterClient clusterClient) {
+		final List<String> messages = clusterClient.getNewMessages();
+		if (!messages.isEmpty()) {
+			System.err.println("New messages from the YARN cluster: ");
+			for (String msg : messages) {
+				System.err.println(msg);
+			}
+		}
+	}
+
+	/**
+	 * Read-Evaluate-Print step for the REPL.
+	 *
+	 * @param in to read from
+	 * @param readConsoleInput true if console input has to be read
+	 * @return true if the REPL shall be continued, otherwise false
+	 * @throws IOException
+	 * @throws InterruptedException
+	 */
+	private static boolean repStep(
+		BufferedReader in,
+		boolean readConsoleInput) throws IOException, InterruptedException {
+
+		// wait until CLIENT_POLLING_INTERVAL is over or the user entered something.
+		long startTime = System.currentTimeMillis();
+		while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVAL_MS
+			&& (!readConsoleInput || !in.ready())) {
+			Thread.sleep(200L);
+		}
+		//------------- handle interactive command by user. ----------------------
+
+		if (readConsoleInput && in.ready()) {
+			String command = in.readLine();
+			switch (command) {
+				case "quit":
+				case "stop":
+					return false;
+
+				case "help":
+					System.err.println(YARN_SESSION_HELP);
+					break;
+				default:
+					System.err.println("Unknown command '" + command + "'. Showing help:");
+					System.err.println(YARN_SESSION_HELP);
+					break;
+			}
+		}
+
+		return true;
+	}
+
+	private static void writeYarnProperties(Properties properties, File propertiesFile) {
+		try (final OutputStream out = new FileOutputStream(propertiesFile)) {
+			properties.store(out, "Generated YARN properties file");
+		} catch (IOException e) {
+			throw new RuntimeException("Error writing the properties file", e);
+		}
+		propertiesFile.setReadable(true, false); // readable for all.
+	}
+
 	public static Map<String, String> getDynamicProperties(String dynamicPropertiesEncoded) {
 		if (dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) {
 			Map<String, String> properties = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/2ce5b98d/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java
new file mode 100644
index 0000000..88d7747
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/YarnApplicationStatusMonitor.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.cli;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utility class which monitors the specified yarn application status periodically.
+ */
+public class YarnApplicationStatusMonitor implements AutoCloseable {
+
+	private static final Logger LOG = LoggerFactory.getLogger(YarnApplicationStatusMonitor.class);
+
+	private static final long UPDATE_INTERVAL = 1000L;
+
+	private final YarnClient yarnClient;
+
+	private final ApplicationId yarnApplicationId;
+
+	private final ScheduledFuture<?> applicationStatusUpdateFuture;
+
+	private volatile ApplicationStatus applicationStatus;
+
+	public YarnApplicationStatusMonitor(
+			YarnClient yarnClient,
+			ApplicationId yarnApplicationId,
+			ScheduledExecutor scheduledExecutor) {
+		this.yarnClient = Preconditions.checkNotNull(yarnClient);
+		this.yarnApplicationId = Preconditions.checkNotNull(yarnApplicationId);
+
+		applicationStatusUpdateFuture = scheduledExecutor.scheduleWithFixedDelay(
+			this::updateApplicationStatus,
+			UPDATE_INTERVAL,
+			UPDATE_INTERVAL,
+			TimeUnit.MILLISECONDS);
+
+		applicationStatus = ApplicationStatus.UNKNOWN;
+	}
+
+	public ApplicationStatus getApplicationStatusNow() {
+		return applicationStatus;
+	}
+
+	@Override
+	public void close() {
+		applicationStatusUpdateFuture.cancel(false);
+	}
+
+	private void updateApplicationStatus() {
+		if (yarnClient.isInState(Service.STATE.STARTED)) {
+			final ApplicationReport applicationReport;
+
+			try {
+				applicationReport = yarnClient.getApplicationReport(yarnApplicationId);
+			} catch (Exception e) {
+				LOG.info("Could not retrieve the Yarn application report for {}.", yarnApplicationId);
+				return;
+			}
+
+			YarnApplicationState yarnApplicationState = applicationReport.getYarnApplicationState();
+
+			if (yarnApplicationState == YarnApplicationState.FAILED || yarnApplicationState == YarnApplicationState.KILLED) {
+				applicationStatus = ApplicationStatus.FAILED;
+			} else {
+				applicationStatus = ApplicationStatus.SUCCEEDED;
+			}
+		} else {
+			LOG.info("Yarn client is no longer in state STARTED. Stopping the Yarn application status monitor.");
+			applicationStatusUpdateFuture.cancel(false);
+		}
+	}
+}


[5/5] flink git commit: [hotfix] Clean up ExecutionGraph

Posted by tr...@apache.org.
[hotfix] Clean up ExecutionGraph

- Remove unnecessary throws clause.
- Format whitespace.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7ee6033
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7ee6033
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7ee6033

Branch: refs/heads/master
Commit: d7ee60330ae91700d7b2c06eec863e3c4b092222
Parents: 86892b8
Author: gyao <ga...@data-artisans.com>
Authored: Wed Dec 20 14:44:26 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 11 17:01:41 2018 +0100

----------------------------------------------------------------------
 .../org/apache/flink/runtime/executiongraph/ExecutionGraph.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d7ee6033/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index a02a687..202839c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -721,7 +721,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * Merges all accumulator results from the tasks previously executed in the Executions.
 	 * @return The accumulator map
 	 */
-	public Map<String, Accumulator<?,?>> aggregateUserAccumulators() {
+	public Map<String, Accumulator<?, ?>> aggregateUserAccumulators() {
 
 		Map<String, Accumulator<?, ?>> userAccumulators = new HashMap<>();
 
@@ -738,7 +738,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/**
 	 * Gets the accumulator results.
 	 */
-	public Map<String, Object> getAccumulators() throws IOException {
+	public Map<String, Object> getAccumulators() {
 
 		Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
 


[3/5] flink git commit: [FLINK-8332] [flip6] Move savepoint dispose into ClusterClient

Posted by tr...@apache.org.
[FLINK-8332] [flip6] Move savepoint dispose into ClusterClient

Move the savepoint disposal logic from the CliFrontend into the ClusterClient. This gives
a better separation of concerns and allows the CliFrontend to be used with different
ClusterClient implementations.

This closes #5219.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2492e9b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2492e9b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2492e9b

Branch: refs/heads/master
Commit: c2492e9b220c6c9a64b47bcdc76a2194d9f4d669
Parents: 156b893
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Dec 18 18:59:30 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 11 16:59:43 2018 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 192 ++++++--------
 .../flink/client/program/ClusterClient.java     |  35 +++
 .../flink/client/CliFrontendSavepointTest.java  | 257 +++++++------------
 .../flink/client/util/MockedCliFrontend.java    |   5 +
 4 files changed, 213 insertions(+), 276 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c2492e9b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index dff12f6..f7fcb60 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -57,9 +57,9 @@ import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -91,14 +91,10 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
-import scala.concurrent.Await;
-import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
-import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
-
 /**
  * Implementation of a simple command line frontend for executing programs.
  */
@@ -659,39 +655,53 @@ public class CliFrontend {
 			return 0;
 		}
 
-		if (options.isDispose()) {
-			// Discard
-			return disposeSavepoint(options);
-		} else {
-			// Trigger
-			String[] cleanedArgs = options.getArgs();
-			JobID jobId;
+		CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(options.getCommandLine());
 
-			if (cleanedArgs.length >= 1) {
-				String jobIdString = cleanedArgs[0];
-				try {
-					jobId = new JobID(StringUtils.hexStringToByte(jobIdString));
-				} catch (Exception e) {
-					return handleArgException(new IllegalArgumentException(
-							"Error: The value for the Job ID is not a valid ID."));
-				}
+		ClusterClient clusterClient = customCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
+
+		try {
+			if (options.isDispose()) {
+				// Discard
+				return disposeSavepoint(clusterClient, options.getSavepointPath());
 			} else {
-				return handleArgException(new IllegalArgumentException(
+				// Trigger
+				String[] cleanedArgs = options.getArgs();
+				JobID jobId;
+
+				if (cleanedArgs.length >= 1) {
+					String jobIdString = cleanedArgs[0];
+					try {
+						jobId = JobID.fromHexString(jobIdString);
+					} catch (Exception e) {
+						return handleArgException(new IllegalArgumentException(
+							"Error: The value for the Job ID is not a valid ID."));
+					}
+				} else {
+					return handleArgException(new IllegalArgumentException(
 						"Error: The value for the Job ID is not a valid ID. " +
-								"Specify a Job ID to trigger a savepoint."));
-			}
+							"Specify a Job ID to trigger a savepoint."));
+				}
 
-			String savepointDirectory = null;
-			if (cleanedArgs.length >= 2) {
-				savepointDirectory = cleanedArgs[1];
-			}
+				String savepointDirectory = null;
+				if (cleanedArgs.length >= 2) {
+					savepointDirectory = cleanedArgs[1];
+				}
 
-			// Print superfluous arguments
-			if (cleanedArgs.length >= 3) {
-				logAndSysout("Provided more arguments than required. Ignoring not needed arguments.");
-			}
+				// Print superfluous arguments
+				if (cleanedArgs.length >= 3) {
+					logAndSysout("Provided more arguments than required. Ignoring not needed arguments.");
+				}
 
-			return triggerSavepoint(options, jobId, savepointDirectory);
+				return triggerSavepoint(clusterClient, jobId, savepointDirectory);
+			}
+		} catch (Exception e) {
+			return handleError(e);
+		} finally {
+			try {
+				clusterClient.shutdown();
+			} catch (Exception e) {
+				LOG.info("Could not shutdown the cluster client.", e);
+			}
 		}
 	}
 
@@ -699,88 +709,53 @@ public class CliFrontend {
 	 * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint}
 	 * message to the job manager.
 	 */
-	private int triggerSavepoint(SavepointOptions options, JobID jobId, String savepointDirectory) {
-		try {
-			CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
-			ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory);
-			try {
-				logAndSysout("Triggering savepoint for job " + jobId + ".");
-				CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobId, savepointDirectory);
-
-				String savepointPath;
-				try {
-					logAndSysout("Waiting for response...");
-					savepointPath = savepointPathFuture.get();
-				}
-				catch (ExecutionException ee) {
-					Throwable cause = ExceptionUtils.stripExecutionException(ee);
-					throw new FlinkException("Triggering a savepoint for the job " + jobId + " failed.", cause);
-				}
-
-				logAndSysout("Savepoint completed. Path: " + savepointPath);
-				logAndSysout("You can resume your program from this savepoint with the run command.");
+	private int triggerSavepoint(ClusterClient clusterClient, JobID jobId, String savepointDirectory) throws Exception {
+		logAndSysout("Triggering savepoint for job " + jobId + ".");
+		CompletableFuture<String> savepointPathFuture = clusterClient.triggerSavepoint(jobId, savepointDirectory);
 
-				return 0;
-			}
-			finally {
-				client.shutdown();
-			}
+		String savepointPath;
+		try {
+			logAndSysout("Waiting for response...");
+			savepointPath = savepointPathFuture.get();
 		}
-		catch (Throwable t) {
-			return handleError(t);
+		catch (ExecutionException ee) {
+			Throwable cause = ExceptionUtils.stripExecutionException(ee);
+			throw new FlinkException("Triggering a savepoint for the job " + jobId + " failed.", cause);
 		}
+
+		logAndSysout("Savepoint completed. Path: " + savepointPath);
+		logAndSysout("You can resume your program from this savepoint with the run command.");
+
+		return 0;
 	}
 
 	/**
 	 * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
 	 * message to the job manager.
 	 */
-	private int disposeSavepoint(SavepointOptions options) {
-		try {
-			String savepointPath = options.getSavepointPath();
-			if (savepointPath == null) {
-				throw new IllegalArgumentException("Missing required argument: savepoint path. " +
-						"Usage: bin/flink savepoint -d <savepoint-path>");
-			}
-
-			ActorGateway jobManager = getJobManagerGateway(options);
+	private int disposeSavepoint(ClusterClient clusterClient, String savepointPath) {
+		Preconditions.checkNotNull(savepointPath, "Missing required argument: savepoint path. " +
+			"Usage: bin/flink savepoint -d <savepoint-path>");
 
-			logAndSysout("Disposing savepoint '" + savepointPath + "'.");
+		logAndSysout("Disposing savepoint '" + savepointPath + "'.");
 
-			Object msg = new DisposeSavepoint(savepointPath);
-			Future<Object> response = jobManager.ask(msg, clientTimeout);
+		final CompletableFuture<Acknowledge> disposeFuture;
+		try {
+			disposeFuture = clusterClient.disposeSavepoint(savepointPath, FutureUtils.toTime(clientTimeout));
+		} catch (Exception e) {
+			return handleError(new FlinkException("Could not dispose savepoint", e));
+		}
 
-			Object result;
-			try {
-				logAndSysout("Waiting for response...");
-				result = Await.result(response, clientTimeout);
-			} catch (Exception e) {
-				throw new Exception("Disposing the savepoint with path" + savepointPath + " failed.", e);
-			}
+		logAndSysout("Waiting for response...");
 
-			if (result.getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass()) {
-				logAndSysout("Savepoint '" + savepointPath + "' disposed.");
-				return 0;
-			} else if (result instanceof DisposeSavepointFailure) {
-				DisposeSavepointFailure failure = (DisposeSavepointFailure) result;
-
-				if (failure.cause() instanceof ClassNotFoundException) {
-					throw new ClassNotFoundException("Savepoint disposal failed, because of a " +
-							"missing class. This is most likely caused by a custom state " +
-							"instance, which cannot be disposed without the user code class " +
-							"loader. Please provide the program jar with which you have created " +
-							"the savepoint via -j <JAR> for disposal.",
-							failure.cause().getCause());
-				} else {
-					throw failure.cause();
-				}
-			} else {
-				throw new IllegalStateException("Unknown JobManager response of type " +
-						result.getClass());
-			}
-		} catch (Throwable t) {
-			return handleError(t);
+		try {
+			disposeFuture.get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS);
+		} catch (Exception e) {
+			return handleError(e);
 		}
+
+		logAndSysout("Savepoint '" + savepointPath + "' disposed.");
+		return 0;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -883,19 +858,6 @@ public class CliFrontend {
 	}
 
 	/**
-	 * Retrieves the {@link ActorGateway} for the JobManager. The ClusterClient is retrieved
-	 * from the provided {@link CommandLineOptions}.
-	 *
-	 * @param options CommandLineOptions specifying the JobManager URL
-	 * @return Gateway to the JobManager
-	 * @throws Exception
-	 */
-	protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception {
-		logAndSysout("Retrieving JobManager.");
-		return retrieveClient(options).getJobManagerGateway();
-	}
-
-	/**
 	 * Creates a {@link ClusterClient} object from the given command line options and other parameters.
 	 * @param options Command line options
 	 * @param program The program for which to create the client.

http://git-wip-us.apache.org/repos/asf/flink/blob/c2492e9b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 3515363..b91927a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
@@ -62,6 +63,7 @@ import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
@@ -691,6 +693,39 @@ public abstract class ClusterClient {
 		});
 	}
 
+	public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath, Time timeout) throws Exception {
+		final ActorGateway jobManager = getJobManagerGateway();
+
+		Object msg = new JobManagerMessages.DisposeSavepoint(savepointPath);
+		CompletableFuture<Object> responseFuture = FutureUtils.toJava(
+			jobManager.ask(
+				msg,
+				FutureUtils.toFiniteDuration(timeout)));
+
+		return responseFuture.thenApply(
+			(Object response) -> {
+				if (response instanceof JobManagerMessages.DisposeSavepointSuccess$) {
+					return Acknowledge.get();
+				} else if (response instanceof JobManagerMessages.DisposeSavepointFailure) {
+					JobManagerMessages.DisposeSavepointFailure failureResponse = (JobManagerMessages.DisposeSavepointFailure) response;
+
+					if (failureResponse.cause() instanceof ClassNotFoundException) {
+						throw new CompletionException(
+							new ClassNotFoundException("Savepoint disposal failed, because of a " +
+								"missing class. This is most likely caused by a custom state " +
+								"instance, which cannot be disposed without the user code class " +
+								"loader. Please provide the program jar with which you have created " +
+								"the savepoint via -j <JAR> for disposal.",
+								failureResponse.cause().getCause()));
+					} else {
+						throw new CompletionException(failureResponse.cause());
+					}
+				} else {
+					throw new CompletionException(new FlinkRuntimeException("Unknown response type " + response.getClass().getSimpleName() + '.'));
+				}
+			});
+	}
+
 	/**
 	 * Lists the currently running and finished jobs on the cluster.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/c2492e9b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
index 1f0d356..8a3c870 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
@@ -19,32 +19,29 @@
 package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.client.util.MockedCliFrontend;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
 
-import akka.dispatch.Futures;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.mockito.Mockito;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.PrintStream;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
 import java.util.zip.ZipOutputStream;
 
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
-import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure;
-import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
@@ -60,7 +57,7 @@ import static org.mockito.Mockito.when;
 /**
  * Tests for the SAVEPOINT command.
  */
-public class CliFrontendSavepointTest {
+public class CliFrontendSavepointTest extends TestLogger {
 
 	private static PrintStream stdOut;
 	private static PrintStream stdErr;
@@ -77,23 +74,26 @@ public class CliFrontendSavepointTest {
 	public void testTriggerSavepointSuccess() throws Exception {
 		replaceStdOutAndStdErr();
 
-		try {
-			JobID jobId = new JobID();
+		JobID jobId = new JobID();
 
-			String savepointPath = "expectedSavepointPath";
+		String savepointPath = "expectedSavepointPath";
 
-			MockedCliFrontend frontend = new SavepointTestCliFrontend(savepointPath);
+		final ClusterClient clusterClient = createClusterClient(savepointPath);
+
+		try {
+			MockedCliFrontend frontend = new MockedCliFrontend(clusterClient);
 
 			String[] parameters = { jobId.toString() };
 			int returnCode = frontend.savepoint(parameters);
 
 			assertEquals(0, returnCode);
-			verify(frontend.client, times(1))
+			verify(clusterClient, times(1))
 				.triggerSavepoint(eq(jobId), isNull(String.class));
 
 			assertTrue(buffer.toString().contains(savepointPath));
 		}
 		finally {
+			clusterClient.shutdown();
 			restoreStdOutAndStdErr();
 		}
 	}
@@ -102,23 +102,27 @@ public class CliFrontendSavepointTest {
 	public void testTriggerSavepointFailure() throws Exception {
 		replaceStdOutAndStdErr();
 
-		try {
-			JobID jobId = new JobID();
+		JobID jobId = new JobID();
 
-			Exception testException = new Exception("expectedTestException");
+		String expectedTestException = "expectedTestException";
+		Exception testException = new Exception(expectedTestException);
 
-			MockedCliFrontend frontend = new SavepointTestCliFrontend(testException);
+		final ClusterClient clusterClient = createFailingClusterClient(testException);
+
+		try {
+			MockedCliFrontend frontend = new MockedCliFrontend(clusterClient);
 
 			String[] parameters = { jobId.toString() };
+
 			int returnCode = frontend.savepoint(parameters);
 
 			assertNotEquals(0, returnCode);
-			verify(frontend.client, times(1))
-				.triggerSavepoint(eq(jobId), isNull(String.class));
 
-			assertTrue(buffer.toString().contains("expectedTestException"));
+			assertTrue(buffer.toString().contains(expectedTestException));
 		}
 		finally {
+
+			clusterClient.shutdown();
 			restoreStdOutAndStdErr();
 		}
 	}
@@ -128,13 +132,15 @@ public class CliFrontendSavepointTest {
 		replaceStdOutAndStdErr();
 
 		try {
-			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+			CliFrontend frontend = new MockedCliFrontend(new StandaloneClusterClient(
+				new Configuration(),
+				new TestingHighAvailabilityServices()));
 
 			String[] parameters = { "invalid job id" };
 			int returnCode = frontend.savepoint(parameters);
 
-			assertTrue(returnCode != 0);
 			assertTrue(buffer.toString().contains("not a valid ID"));
+			assertNotEquals(0, returnCode);
 		}
 		finally {
 			restoreStdOutAndStdErr();
@@ -149,23 +155,27 @@ public class CliFrontendSavepointTest {
 	public void testTriggerSavepointCustomTarget() throws Exception {
 		replaceStdOutAndStdErr();
 
-		try {
-			JobID jobId = new JobID();
+		JobID jobId = new JobID();
 
-			String savepointDirectory = "customTargetDirectory";
+		String savepointDirectory = "customTargetDirectory";
 
-			MockedCliFrontend frontend = new SavepointTestCliFrontend(savepointDirectory);
+		final ClusterClient clusterClient = createClusterClient(savepointDirectory);
+
+		try {
+			MockedCliFrontend frontend = new MockedCliFrontend(clusterClient);
 
 			String[] parameters = { jobId.toString(), savepointDirectory };
 			int returnCode = frontend.savepoint(parameters);
 
 			assertEquals(0, returnCode);
-			verify(frontend.client, times(1))
+			verify(clusterClient, times(1))
 				.triggerSavepoint(eq(jobId), eq(savepointDirectory));
 
 			assertTrue(buffer.toString().contains(savepointDirectory));
 		}
 		finally {
+			clusterClient.shutdown();
+
 			restoreStdOutAndStdErr();
 		}
 	}
@@ -178,65 +188,24 @@ public class CliFrontendSavepointTest {
 	public void testDisposeSavepointSuccess() throws Exception {
 		replaceStdOutAndStdErr();
 
-		try {
-			String savepointPath = "expectedSavepointPath";
-			ActorGateway jobManager = mock(ActorGateway.class);
-
-			Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
+		String savepointPath = "expectedSavepointPath";
 
-			when(jobManager.ask(
-					Mockito.eq(new DisposeSavepoint(savepointPath)),
-					any(FiniteDuration.class))).thenReturn(triggerResponse.future());
+		ClusterClient clusterClient = new DisposeSavepointClusterClient(
+			(String path, Time timeout) -> CompletableFuture.completedFuture(Acknowledge.get()));
 
-			triggerResponse.success(getDisposeSavepointSuccess());
+		try {
 
-			CliFrontend frontend = new MockCliFrontend(
-					CliFrontendTestUtils.getConfigDir(), jobManager);
+			CliFrontend frontend = new MockedCliFrontend(clusterClient);
 
 			String[] parameters = { "-d", savepointPath };
-			int returnCode = frontend.savepoint(parameters);
-
-			assertEquals(0, returnCode);
-			verify(jobManager, times(1)).ask(
-					Mockito.eq(new DisposeSavepoint(savepointPath)),
-					any(FiniteDuration.class));
+			frontend.savepoint(parameters);
 
 			String outMsg = buffer.toString();
 			assertTrue(outMsg.contains(savepointPath));
 			assertTrue(outMsg.contains("disposed"));
 		}
 		finally {
-			restoreStdOutAndStdErr();
-		}
-	}
-
-	/**
-	 * Tests that a disposal failure due a  ClassNotFoundException triggers a
-	 * note about the JAR option.
-	 */
-	@Test
-	public void testDisposeClassNotFoundException() throws Exception {
-		replaceStdOutAndStdErr();
-
-		try {
-			Future<Object> classNotFoundFailure = Futures
-					.<Object>successful(new DisposeSavepointFailure(new ClassNotFoundException("Test exception")));
-
-			ActorGateway jobManager = mock(ActorGateway.class);
-			when(jobManager.ask(any(DisposeSavepoint.class), any(FiniteDuration.class)))
-					.thenReturn(classNotFoundFailure);
-
-			CliFrontend frontend = new MockCliFrontend(CliFrontendTestUtils.getConfigDir(), jobManager);
-
-			String[] parameters = { "-d", "any-path" };
-
-			int returnCode = frontend.savepoint(parameters);
-			assertTrue(returnCode != 0);
-
-			String out = buffer.toString();
-			assertTrue(out.contains("Please provide the program jar with which you have created " +
-					"the savepoint via -j <JAR> for disposal"));
-		} finally {
+			clusterClient.shutdown();
 			restoreStdOutAndStdErr();
 		}
 	}
@@ -248,23 +217,32 @@ public class CliFrontendSavepointTest {
 	public void testDisposeWithJar() throws Exception {
 		replaceStdOutAndStdErr();
 
-		try {
-			ActorGateway jobManager = mock(ActorGateway.class);
-			when(jobManager.ask(any(DisposeSavepoint.class), any(FiniteDuration.class)))
-					.thenReturn(Futures.successful(JobManagerMessages.getDisposeSavepointSuccess()));
+		final CompletableFuture<String> disposeSavepointFuture = new CompletableFuture<>();
 
-			CliFrontend frontend = new MockCliFrontend(CliFrontendTestUtils.getConfigDir(), jobManager);
+		final ClusterClient clusterClient = new DisposeSavepointClusterClient(
+			(String savepointPath, Time timeout) -> {
+				disposeSavepointFuture.complete(savepointPath);
+				return CompletableFuture.completedFuture(Acknowledge.get());
+			});
+
+		try {
+			CliFrontend frontend = new MockedCliFrontend(clusterClient);
 
 			// Fake JAR file
 			File f = tmp.newFile();
 			ZipOutputStream out = new ZipOutputStream(new FileOutputStream(f));
 			out.close();
 
-			String[] parameters = { "-d", "any-path", "-j", f.getAbsolutePath() };
+			final String disposePath = "any-path";
+			String[] parameters = { "-d", disposePath, "-j", f.getAbsolutePath() };
 
-			int returnCode = frontend.savepoint(parameters);
-			assertEquals(0, returnCode);
+			frontend.savepoint(parameters);
+
+			final String actualSavepointPath = disposeSavepointFuture.get();
+
+			assertEquals(disposePath, actualSavepointPath);
 		} finally {
+			clusterClient.shutdown();
 			restoreStdOutAndStdErr();
 		}
 	}
@@ -273,92 +251,44 @@ public class CliFrontendSavepointTest {
 	public void testDisposeSavepointFailure() throws Exception {
 		replaceStdOutAndStdErr();
 
-		try {
-			String savepointPath = "expectedSavepointPath";
-			ActorGateway jobManager = mock(ActorGateway.class);
-
-			Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
-			when(jobManager.ask(
-					Mockito.eq(new DisposeSavepoint(savepointPath)),
-					any(FiniteDuration.class)))
-					.thenReturn(triggerResponse.future());
-
-			Exception testException = new Exception("expectedTestException");
+		String savepointPath = "expectedSavepointPath";
 
-			triggerResponse.success(new DisposeSavepointFailure(testException));
-
-			CliFrontend frontend = new MockCliFrontend(
-					CliFrontendTestUtils.getConfigDir(), jobManager);
-
-			String[] parameters = { "-d", savepointPath };
-			int returnCode = frontend.savepoint(parameters);
-
-			assertTrue(returnCode != 0);
-			verify(jobManager, times(1)).ask(
-					Mockito.eq(new DisposeSavepoint(savepointPath)),
-					any(FiniteDuration.class));
-
-			assertTrue(buffer.toString().contains("expectedTestException"));
-		}
-		finally {
-			restoreStdOutAndStdErr();
-		}
-	}
+		Exception testException = new Exception("expectedTestException");
 
-	@Test
-	public void testDisposeSavepointFailureUnknownResponse() throws Exception {
-		replaceStdOutAndStdErr();
+		ClusterClient clusterClient = new DisposeSavepointClusterClient((String path, Time timeout) -> FutureUtils.completedExceptionally(testException));
 
 		try {
-			String savepointPath = "expectedSavepointPath";
-			ActorGateway jobManager = mock(ActorGateway.class);
-
-			Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
-
-			when(jobManager.ask(
-					Mockito.eq(new DisposeSavepoint(savepointPath)),
-					any(FiniteDuration.class)))
-					.thenReturn(triggerResponse.future());
-
-			triggerResponse.success("UNKNOWN RESPONSE");
-
-			CliFrontend frontend = new MockCliFrontend(
-					CliFrontendTestUtils.getConfigDir(), jobManager);
+			CliFrontend frontend = new MockedCliFrontend(clusterClient);
 
 			String[] parameters = { "-d", savepointPath };
+
 			int returnCode = frontend.savepoint(parameters);
 
-			assertTrue(returnCode != 0);
-			verify(jobManager, times(1)).ask(
-					Mockito.eq(new DisposeSavepoint(savepointPath)),
-					any(FiniteDuration.class));
+			assertNotEquals(0, returnCode);
 
-			String errMsg = buffer.toString();
-			assertTrue(errMsg.contains("IllegalStateException"));
-			assertTrue(errMsg.contains("Unknown JobManager response"));
+			assertTrue(buffer.toString().contains(testException.getMessage()));
 		}
 		finally {
+			clusterClient.shutdown();
 			restoreStdOutAndStdErr();
 		}
-
-		replaceStdOutAndStdErr();
 	}
 
 	// ------------------------------------------------------------------------
 
-	private static class MockCliFrontend extends CliFrontend {
+	private static final class DisposeSavepointClusterClient extends StandaloneClusterClient {
+
+		final BiFunction<String, Time, CompletableFuture<Acknowledge>> disposeSavepointFunction;
 
-		private final ActorGateway mockJobManager;
+		public DisposeSavepointClusterClient(BiFunction<String, Time, CompletableFuture<Acknowledge>> disposeSavepointFunction) throws Exception {
+			super(new Configuration(), new TestingHighAvailabilityServices());
 
-		public MockCliFrontend(String configDir, ActorGateway mockJobManager) throws Exception {
-			super(configDir);
-			this.mockJobManager = mockJobManager;
+			this.disposeSavepointFunction = Preconditions.checkNotNull(disposeSavepointFunction);
 		}
 
 		@Override
-		protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception {
-			return mockJobManager;
+		public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath, Time timeout) {
+			return disposeSavepointFunction.apply(savepointPath, timeout);
 		}
 	}
 
@@ -376,16 +306,21 @@ public class CliFrontendSavepointTest {
 		System.setErr(stdErr);
 	}
 
-	private static final class SavepointTestCliFrontend extends MockedCliFrontend {
+	private static ClusterClient createClusterClient(String expectedResponse) throws Exception {
+		final ClusterClient clusterClient = mock(ClusterClient.class);
 
-		SavepointTestCliFrontend(String expectedResponse) throws Exception {
-			when(client.triggerSavepoint(any(JobID.class), anyString()))
-				.thenReturn(CompletableFuture.completedFuture(expectedResponse));
-		}
+		when(clusterClient.triggerSavepoint(any(JobID.class), anyString()))
+			.thenReturn(CompletableFuture.completedFuture(expectedResponse));
 
-		SavepointTestCliFrontend(Exception expectedException) throws Exception {
-			when(client.triggerSavepoint(any(JobID.class), anyString()))
-				.thenReturn(FutureUtils.completedExceptionally(expectedException));
-		}
+		return clusterClient;
+	}
+
+	private static ClusterClient createFailingClusterClient(Exception expectedException) throws Exception {
+		final ClusterClient clusterClient = mock(ClusterClient.class);
+
+		when(clusterClient.triggerSavepoint(any(JobID.class), anyString()))
+			.thenReturn(FutureUtils.completedExceptionally(expectedException));
+
+		return clusterClient;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c2492e9b/flink-clients/src/test/java/org/apache/flink/client/util/MockedCliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/util/MockedCliFrontend.java b/flink-clients/src/test/java/org/apache/flink/client/util/MockedCliFrontend.java
index c121c25..663746b 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/util/MockedCliFrontend.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/util/MockedCliFrontend.java
@@ -39,6 +39,11 @@ import static org.powermock.api.mockito.PowerMockito.when;
 public class MockedCliFrontend extends CliFrontend {
 	public final ClusterClient client;
 
+	public MockedCliFrontend(ClusterClient clusterClient) throws Exception {
+		super(CliFrontendTestUtils.getConfigDir());
+		this.client = clusterClient;
+	}
+
 	protected MockedCliFrontend() throws Exception {
 		super(CliFrontendTestUtils.getConfigDir());
 		this.client = mock(ClusterClient.class);


[2/5] flink git commit: [FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor

Posted by tr...@apache.org.
[FLINK-8329] [flip6] Move YarnClient to AbstractYarnClusterDescriptor

Moves the YarnClient from the YarnClusterClient to the AbstractYarnClusterDescriptor.
This makes the latter responsible for the lifecycle management of the client and gives
a better separation of concerns.

This closes #5216.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/156b8935
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/156b8935
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/156b8935

Branch: refs/heads/master
Commit: 156b8935ef76eb53456cea1d40fd528ccefa21d8
Parents: 2ce5b98
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Dec 20 16:43:21 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 11 16:59:29 2018 +0100

----------------------------------------------------------------------
 .../client/deployment/ClusterDescriptor.java    |   2 +-
 .../Flip6StandaloneClusterDescriptor.java       |   5 +
 .../deployment/StandaloneClusterDescriptor.java |   5 +
 ...CliFrontendYarnAddressConfigurationTest.java |  80 ++--
 .../flink/yarn/FlinkYarnSessionCliTest.java     |   6 +-
 .../yarn/TestingYarnClusterDescriptor.java      |   7 +-
 .../java/org/apache/flink/yarn/YARNITCase.java  |  47 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |  81 ++--
 .../flink/yarn/YarnClusterDescriptorTest.java   |  94 ++--
 .../yarn/AbstractYarnClusterDescriptor.java     |  87 ++--
 .../apache/flink/yarn/YarnClusterClient.java    |  14 +-
 .../flink/yarn/YarnClusterDescriptor.java       |   9 +-
 .../flink/yarn/YarnClusterDescriptorV2.java     |   9 +-
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |   6 +-
 .../flink/yarn/YarnClusterDescriptorTest.java   | 451 ++++++++++---------
 15 files changed, 483 insertions(+), 420 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
index a62ceff..1603930 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 /**
  * A descriptor to deploy a cluster (e.g. Yarn or Mesos) and return a Client for Cluster communication.
  */
-public interface ClusterDescriptor<ClientType extends ClusterClient> {
+public interface ClusterDescriptor<ClientType extends ClusterClient> extends AutoCloseable {
 
 	/**
 	 * Returns a String containing details about the cluster (NodeManagers, available memory, ...).

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
index 9d88f59..b8eb534 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
@@ -60,4 +60,9 @@ public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor<RestC
 	public RestClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) {
 		throw new UnsupportedOperationException("Can't deploy a standalone FLIP-6 per-job cluster.");
 	}
+
+	@Override
+	public void close() throws Exception {
+		// nothing to do
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index 51e267a..3808efa 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -59,4 +59,9 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor<Standalone
 	public StandaloneClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) {
 		throw new UnsupportedOperationException("Can't deploy a standalone per-job cluster.");
 	}
+
+	@Override
+	public void close() throws Exception {
+		// nothing to do
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index 1b457a5..56087a1 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -38,7 +38,6 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.junit.AfterClass;
@@ -375,12 +374,10 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 			private class TestingYarnClusterDescriptor extends YarnClusterDescriptor {
 
 				public TestingYarnClusterDescriptor(Configuration flinkConfiguration, String configurationDirectory) {
-					super(flinkConfiguration, configurationDirectory);
-				}
-
-				@Override
-				public YarnClient getYarnClient() {
-					return new TestYarnClient();
+					super(
+						flinkConfiguration,
+						configurationDirectory,
+						new TestYarnClient(finalApplicationStatus));
 				}
 
 				@Override
@@ -388,52 +385,51 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 						AbstractYarnClusterDescriptor descriptor,
 						int numberTaskManagers,
 						int slotsPerTaskManager,
-						YarnClient yarnClient,
 						ApplicationReport report,
 						Configuration flinkConfiguration,
 						boolean perJobCluster) throws IOException, YarnException {
 
 					return Mockito.mock(YarnClusterClient.class);
 				}
+			}
+		}
 
-				private class TestYarnClient extends YarnClientImpl {
-
-					private final List<ApplicationReport> reports = new LinkedList<>();
-
-					TestYarnClient() {
-						{   // a report that of our Yarn application we want to resume from
-							ApplicationReport report = Mockito.mock(ApplicationReport.class);
-							Mockito.when(report.getHost()).thenReturn(TEST_YARN_JOB_MANAGER_ADDRESS);
-							Mockito.when(report.getRpcPort()).thenReturn(TEST_YARN_JOB_MANAGER_PORT);
-							Mockito.when(report.getApplicationId()).thenReturn(TEST_YARN_APPLICATION_ID);
-							Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus);
-							this.reports.add(report);
-						}
-						{   // a second report, just for noise
-							ApplicationReport report = Mockito.mock(ApplicationReport.class);
-							Mockito.when(report.getHost()).thenReturn("1.2.3.4");
-							Mockito.when(report.getRpcPort()).thenReturn(-123);
-							Mockito.when(report.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 0));
-							Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus);
-							this.reports.add(report);
-						}
-					}
+		private static class TestYarnClient extends YarnClientImpl {
 
-					@Override
-					public List<ApplicationReport> getApplications() throws YarnException, IOException {
-						return reports;
-					}
+			private final List<ApplicationReport> reports = new LinkedList<>();
 
-					@Override
-					public ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException, IOException {
-						for (ApplicationReport report : reports) {
-							if (report.getApplicationId().equals(appId)) {
-								return report;
-							}
-						}
-						throw new YarnException();
+			TestYarnClient(FinalApplicationStatus finalApplicationStatus) {
+				{   // a report that of our Yarn application we want to resume from
+					ApplicationReport report = Mockito.mock(ApplicationReport.class);
+					Mockito.when(report.getHost()).thenReturn(TEST_YARN_JOB_MANAGER_ADDRESS);
+					Mockito.when(report.getRpcPort()).thenReturn(TEST_YARN_JOB_MANAGER_PORT);
+					Mockito.when(report.getApplicationId()).thenReturn(TEST_YARN_APPLICATION_ID);
+					Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus);
+					this.reports.add(report);
+				}
+				{   // a second report, just for noise
+					ApplicationReport report = Mockito.mock(ApplicationReport.class);
+					Mockito.when(report.getHost()).thenReturn("1.2.3.4");
+					Mockito.when(report.getRpcPort()).thenReturn(-123);
+					Mockito.when(report.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 0));
+					Mockito.when(report.getFinalApplicationStatus()).thenReturn(finalApplicationStatus);
+					this.reports.add(report);
+				}
+			}
+
+			@Override
+			public List<ApplicationReport> getApplications() throws YarnException, IOException {
+				return reports;
+			}
+
+			@Override
+			public ApplicationReport getApplicationReport(ApplicationId appId) throws YarnException, IOException {
+				for (ApplicationReport report : reports) {
+					if (report.getApplicationId().equals(appId)) {
+						return report;
 					}
 				}
+				throw new YarnException();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 8eef8f0..3fe8d2f 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -174,7 +174,10 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
 		private static class JarAgnosticClusterDescriptor extends YarnClusterDescriptor {
 			public JarAgnosticClusterDescriptor(Configuration flinkConfiguration, String configurationDirectory) {
-				super(flinkConfiguration, configurationDirectory);
+				super(
+					flinkConfiguration,
+					configurationDirectory,
+					YarnClient.createYarnClient());
 			}
 
 			@Override
@@ -202,7 +205,6 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 			super(descriptor,
 				numberTaskManagers,
 				slotsPerTaskManager,
-				Mockito.mock(YarnClient.class),
 				Mockito.mock(ApplicationReport.class),
 				config,
 				false);

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
index 30d2798..e66d2e0 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java
@@ -23,6 +23,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.hadoop.yarn.client.api.YarnClient;
+
 import java.io.File;
 import java.io.FilenameFilter;
 import java.util.ArrayList;
@@ -36,7 +38,10 @@ import java.util.List;
 public class TestingYarnClusterDescriptor extends AbstractYarnClusterDescriptor {
 
 	public TestingYarnClusterDescriptor(Configuration configuration, String configurationDirectory) {
-		super(configuration, configurationDirectory);
+		super(
+			configuration,
+			configurationDirectory,
+			YarnClient.createYarnClient());
 		List<File> filesToShip = new ArrayList<>();
 
 		File testingJar = YarnTestBase.findFile("..", new TestJarFinder("flink-yarn-tests"));

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index bc28c5b..069f68a 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -49,35 +50,43 @@ public class YARNITCase extends YarnTestBase {
 
 	@Ignore("The cluster cannot be stopped yet.")
 	@Test
-	public void testPerJobMode() {
+	public void testPerJobMode() throws Exception {
 		Configuration configuration = new Configuration();
 		configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
-		YarnClusterDescriptorV2 yarnClusterDescriptorV2 = new YarnClusterDescriptorV2(configuration, System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR));
+		final YarnClient yarnClient = YarnClient.createYarnClient();
 
-		yarnClusterDescriptorV2.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
-		yarnClusterDescriptorV2.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+		try (final YarnClusterDescriptorV2 yarnClusterDescriptorV2 = new YarnClusterDescriptorV2(
+			configuration,
+			System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR),
+			yarnClient)) {
 
-		final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
-			.setMasterMemoryMB(768)
-			.setTaskManagerMemoryMB(1024)
-			.setSlotsPerTaskManager(1)
-			.setNumberTaskManagers(1)
-			.createClusterSpecification();
+			yarnClusterDescriptorV2.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
+			yarnClusterDescriptorV2.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(2);
+			final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
+				.setMasterMemoryMB(768)
+				.setTaskManagerMemoryMB(1024)
+				.setSlotsPerTaskManager(1)
+				.setNumberTaskManagers(1)
+				.createClusterSpecification();
 
-		env.addSource(new InfiniteSource())
-			.shuffle()
-			.addSink(new DiscardingSink<Integer>());
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(2);
 
-		final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+			env.addSource(new InfiniteSource())
+				.shuffle()
+				.addSink(new DiscardingSink<Integer>());
 
-		File testingJar = YarnTestBase.findFile("..", new TestingYarnClusterDescriptor.TestJarFinder("flink-yarn-tests"));
+			final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
-		jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
+			File testingJar = YarnTestBase.findFile("..", new TestingYarnClusterDescriptor.TestJarFinder("flink-yarn-tests"));
 
-		YarnClusterClient clusterClient = yarnClusterDescriptorV2.deployJobCluster(clusterSpecification, jobGraph);
+			jobGraph.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
+
+			YarnClusterClient clusterClient = yarnClusterDescriptorV2.deployJobCluster(clusterSpecification, jobGraph);
+
+			clusterClient.shutdown();
+		}
 	}
 
 	private static class InfiniteSource implements ParallelSourceFunction<Integer> {

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index dd56f2f..ec8ef50 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -227,51 +227,56 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 
 		String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
 		Configuration configuration = GlobalConfiguration.loadConfiguration();
-		AbstractYarnClusterDescriptor flinkYarnClient = new YarnClusterDescriptor(configuration, confDirPath);
-		Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
-		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
-		flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
+		final YarnClient yarnClient = YarnClient.createYarnClient();
 
-		final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
-			.setMasterMemoryMB(768)
-			.setTaskManagerMemoryMB(1024)
-			.setNumberTaskManagers(1)
-			.setSlotsPerTaskManager(1)
-			.createClusterSpecification();
+		try (final AbstractYarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
+			configuration,
+			confDirPath,
+			yarnClient)) {
+			Assert.assertNotNull("unable to get yarn client", clusterDescriptor);
+			clusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
+			clusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
 
-		// deploy
-		ClusterClient yarnCluster = null;
-		try {
-			yarnCluster = flinkYarnClient.deploySessionCluster(clusterSpecification);
-		} catch (Exception e) {
-			LOG.warn("Failing test", e);
-			Assert.fail("Error while deploying YARN cluster: " + e.getMessage());
-		}
-		GetClusterStatusResponse expectedStatus = new GetClusterStatusResponse(1, 1);
-		for (int second = 0; second < waitTime * 2; second++) { // run "forever"
+			final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
+				.setMasterMemoryMB(768)
+				.setTaskManagerMemoryMB(1024)
+				.setNumberTaskManagers(1)
+				.setSlotsPerTaskManager(1)
+				.createClusterSpecification();
+			// deploy
+			ClusterClient yarnCluster = null;
 			try {
-				Thread.sleep(1000);
-			} catch (InterruptedException e) {
-				LOG.warn("Interrupted", e);
-			}
-			GetClusterStatusResponse status = yarnCluster.getClusterStatus();
-			if (status != null && status.equals(expectedStatus)) {
-				LOG.info("ClusterClient reached status " + status);
-				break; // all good, cluster started
+				yarnCluster = clusterDescriptor.deploySessionCluster(clusterSpecification);
+			} catch (Exception e) {
+				LOG.warn("Failing test", e);
+				Assert.fail("Error while deploying YARN cluster: " + e.getMessage());
 			}
-			if (second > waitTime) {
-				// we waited for 15 seconds. cluster didn't come up correctly
-				Assert.fail("The custer didn't start after " + waitTime + " seconds");
+			GetClusterStatusResponse expectedStatus = new GetClusterStatusResponse(1, 1);
+			for (int second = 0; second < waitTime * 2; second++) { // run "forever"
+				try {
+					Thread.sleep(1000);
+				} catch (InterruptedException e) {
+					LOG.warn("Interrupted", e);
+				}
+				GetClusterStatusResponse status = yarnCluster.getClusterStatus();
+				if (status != null && status.equals(expectedStatus)) {
+					LOG.info("ClusterClient reached status " + status);
+					break; // all good, cluster started
+				}
+				if (second > waitTime) {
+					// we waited for 15 seconds. cluster didn't come up correctly
+					Assert.fail("The custer didn't start after " + waitTime + " seconds");
+				}
 			}
-		}
 
-		// use the cluster
-		Assert.assertNotNull(yarnCluster.getJobManagerAddress());
-		Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
+			// use the cluster
+			Assert.assertNotNull(yarnCluster.getJobManagerAddress());
+			Assert.assertNotNull(yarnCluster.getWebInterfaceURL());
 
-		LOG.info("Shutting down cluster. All tests passed");
-		// shutdown cluster
-		yarnCluster.shutdown();
+			LOG.info("Shutting down cluster. All tests passed");
+			// shutdown cluster
+			yarnCluster.shutdown();
+		}
 		LOG.info("Finished testJavaAPI()");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index f3e48c5..5144550 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -50,32 +51,40 @@ public class YarnClusterDescriptorTest extends TestLogger {
 	 */
 	@Test
 	public void testExplicitLibShipping() throws Exception {
-		AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(new Configuration(), temporaryFolder.getRoot().getAbsolutePath());
-		descriptor.setLocalJarPath(new Path("/path/to/flink.jar"));
+		AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(
+			new Configuration(),
+			temporaryFolder.getRoot().getAbsolutePath(),
+			YarnClient.createYarnClient());
 
-		File libFile = temporaryFolder.newFile("libFile.jar");
-		File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
+		try {
+			descriptor.setLocalJarPath(new Path("/path/to/flink.jar"));
+
+			File libFile = temporaryFolder.newFile("libFile.jar");
+			File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
 
-		Assert.assertFalse(descriptor.shipFiles.contains(libFile));
-		Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
+			Assert.assertFalse(descriptor.shipFiles.contains(libFile));
+			Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
 
-		List<File> shipFiles = new ArrayList<>();
-		shipFiles.add(libFile);
-		shipFiles.add(libFolder);
+			List<File> shipFiles = new ArrayList<>();
+			shipFiles.add(libFile);
+			shipFiles.add(libFolder);
 
-		descriptor.addShipFiles(shipFiles);
+			descriptor.addShipFiles(shipFiles);
 
-		Assert.assertTrue(descriptor.shipFiles.contains(libFile));
-		Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
+			Assert.assertTrue(descriptor.shipFiles.contains(libFile));
+			Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
 
-		// only execute part of the deployment to test for shipped files
-		Set<File> effectiveShipFiles = new HashSet<>();
-		descriptor.addLibFolderToShipFiles(effectiveShipFiles);
+			// only execute part of the deployment to test for shipped files
+			Set<File> effectiveShipFiles = new HashSet<>();
+			descriptor.addLibFolderToShipFiles(effectiveShipFiles);
 
-		Assert.assertEquals(0, effectiveShipFiles.size());
-		Assert.assertEquals(2, descriptor.shipFiles.size());
-		Assert.assertTrue(descriptor.shipFiles.contains(libFile));
-		Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
+			Assert.assertEquals(0, effectiveShipFiles.size());
+			Assert.assertEquals(2, descriptor.shipFiles.size());
+			Assert.assertTrue(descriptor.shipFiles.contains(libFile));
+			Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
+		} finally {
+			descriptor.close();
+		}
 	}
 
 	/**
@@ -83,30 +92,37 @@ public class YarnClusterDescriptorTest extends TestLogger {
 	 */
 	@Test
 	public void testEnvironmentLibShipping() throws Exception {
-		AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(new Configuration(), temporaryFolder.getRoot().getAbsolutePath());
+		AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor(
+			new Configuration(),
+			temporaryFolder.getRoot().getAbsolutePath(),
+			YarnClient.createYarnClient());
 
-		File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
-		File libFile = new File(libFolder, "libFile.jar");
-		libFile.createNewFile();
-
-		Set<File> effectiveShipFiles = new HashSet<>();
-
-		final Map<String, String> oldEnv = System.getenv();
 		try {
-			Map<String, String> env = new HashMap<>(1);
-			env.put(ConfigConstants.ENV_FLINK_LIB_DIR, libFolder.getAbsolutePath());
-			TestBaseUtils.setEnv(env);
-			// only execute part of the deployment to test for shipped files
-			descriptor.addLibFolderToShipFiles(effectiveShipFiles);
+			File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
+			File libFile = new File(libFolder, "libFile.jar");
+			libFile.createNewFile();
+
+			Set<File> effectiveShipFiles = new HashSet<>();
+
+			final Map<String, String> oldEnv = System.getenv();
+			try {
+				Map<String, String> env = new HashMap<>(1);
+				env.put(ConfigConstants.ENV_FLINK_LIB_DIR, libFolder.getAbsolutePath());
+				TestBaseUtils.setEnv(env);
+				// only execute part of the deployment to test for shipped files
+				descriptor.addLibFolderToShipFiles(effectiveShipFiles);
+			} finally {
+				TestBaseUtils.setEnv(oldEnv);
+			}
+
+			// only add the ship the folder, not the contents
+			Assert.assertFalse(effectiveShipFiles.contains(libFile));
+			Assert.assertTrue(effectiveShipFiles.contains(libFolder));
+			Assert.assertFalse(descriptor.shipFiles.contains(libFile));
+			Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
 		} finally {
-			TestBaseUtils.setEnv(oldEnv);
+			descriptor.close();
 		}
-
-		// only add the ship the folder, not the contents
-		Assert.assertFalse(effectiveShipFiles.contains(libFile));
-		Assert.assertTrue(effectiveShipFiles.contains(libFolder));
-		Assert.assertFalse(descriptor.shipFiles.contains(libFile));
-		Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 86ddd9b..0372319 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
@@ -35,7 +36,6 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -108,14 +108,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	private static final int MIN_JM_MEMORY = 768; // the minimum memory should be higher than the min heap cutoff
 	private static final int MIN_TM_MEMORY = 768;
 
-	private Configuration conf = new YarnConfiguration();
+	private final YarnConfiguration yarnConfiguration;
 
-	/**
-	 * If the user has specified a different number of slots, we store them here
-	 * Files (usually in a distributed file system) used for the YARN session of Flink.
-	 * Contains configuration files and jar files.
-	 */
-	private Path sessionFilesDir;
+	private final YarnClient yarnClient;
 
 	private String yarnQueue;
 
@@ -128,7 +123,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	/** Lazily initialized list of files to ship. */
 	protected List<File> shipFiles = new LinkedList<>();
 
-	private final org.apache.flink.configuration.Configuration flinkConfiguration;
+	private final Configuration flinkConfiguration;
 
 	private boolean detached;
 
@@ -143,31 +138,48 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	private YarnConfigOptions.UserJarInclusion userJarInclusion;
 
 	public AbstractYarnClusterDescriptor(
-		org.apache.flink.configuration.Configuration flinkConfiguration,
-		String configurationDirectory) {
+			Configuration flinkConfiguration,
+			String configurationDirectory,
+			YarnClient yarnClient) {
+
+		yarnConfiguration = new YarnConfiguration();
+
 		// for unit tests only
 		if (System.getenv("IN_TESTS") != null) {
 			try {
-				conf.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
+				yarnConfiguration.addResource(new File(System.getenv("YARN_CONF_DIR"), "yarn-site.xml").toURI().toURL());
 			} catch (Throwable t) {
 				throw new RuntimeException("Error", t);
 			}
 		}
 
+		this.yarnClient = Preconditions.checkNotNull(yarnClient);
+		yarnClient.init(yarnConfiguration);
+		yarnClient.start();
+
 		this.flinkConfiguration = Preconditions.checkNotNull(flinkConfiguration);
 		userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
 
 		this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
 	}
 
+	public YarnClient getYarnClient() {
+		return yarnClient;
+	}
+
 	/**
-	 * The class to bootstrap the application master of the Yarn cluster (runs main method).
+	 * The class to start the application master with. This class runs the main
+	 * method in case of session cluster.
 	 */
 	protected abstract String getYarnSessionClusterEntrypoint();
 
+	/**
+	 * The class to start the application master with. This class runs the main
+	 * method in case of the job cluster.
+	 */
 	protected abstract String getYarnJobClusterEntrypoint();
 
-	public org.apache.flink.configuration.Configuration getFlinkConfiguration() {
+	public Configuration getFlinkConfiguration() {
 		return flinkConfiguration;
 	}
 
@@ -257,7 +269,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// Check if we don't exceed YARN's maximum virtual cores.
 		// The number of cores can be configured in the config.
 		// If not configured, it is set to the number of task slots
-		int numYarnVcores = conf.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
+		int numYarnVcores = yarnConfiguration.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
 		int configuredVcores = flinkConfiguration.getInteger(YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager());
 		// don't configure more than the maximum configured number of vcores
 		if (configuredVcores > numYarnVcores) {
@@ -304,21 +316,22 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		this.zookeeperNamespace = zookeeperNamespace;
 	}
 
-	/**
-	 * Gets a Hadoop Yarn client.
-	 * @return Returns a YarnClient which has to be shutdown manually
-	 */
-	public YarnClient getYarnClient() {
-		YarnClient yarnClient = YarnClient.createYarnClient();
-		yarnClient.init(conf);
-		yarnClient.start();
-		return yarnClient;
+	// -------------------------------------------------------------
+	// Lifecycle management
+	// -------------------------------------------------------------
+
+	@Override
+	public void close() {
+		yarnClient.stop();
 	}
 
+	// -------------------------------------------------------------
+	// ClusterClient overrides
+	// -------------------------------------------------------------
+
 	@Override
 	public YarnClusterClient retrieve(String applicationID) {
 
-		YarnClient yarnClient = null;
 		try {
 			// check if required Hadoop environment variables are set. If not, warn user
 			if (System.getenv("HADOOP_CONF_DIR") == null &&
@@ -329,7 +342,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 
 			final ApplicationId yarnAppId = ConverterUtils.toApplicationId(applicationID);
-			yarnClient = getYarnClient();
 			final ApplicationReport appReport = yarnClient.getApplicationReport(yarnAppId);
 
 			if (appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
@@ -349,14 +361,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 				this,
 				-1, // we don't know the number of task managers of a started Flink cluster
 				-1, // we don't know how many slots each task manager has for a started Flink cluster
-				yarnClient,
 				appReport,
 				flinkConfiguration,
 				false);
 		} catch (Exception e) {
-			if (null != yarnClient) {
-				yarnClient.stop();
-			}
 			throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
 		}
 	}
@@ -414,8 +422,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 		isReadyForDeployment(clusterSpecification);
 
-		final YarnClient yarnClient = getYarnClient();
-
 		// ------------------ Check if the specified queue exists --------------------
 
 		checkYarnQueues(yarnClient);
@@ -442,7 +448,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
 		}
 
-		final int yarnMinAllocationMB = conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
+		final int yarnMinAllocationMB = yarnConfiguration.getInt("yarn.scheduler.minimum-allocation-mb", 0);
 
 		final ClusterSpecification validClusterSpecification;
 		try {
@@ -477,7 +483,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			this,
 			clusterSpecification.getNumberTaskManagers(),
 			clusterSpecification.getSlotsPerTaskManager(),
-			yarnClient,
 			report,
 			flinkConfiguration,
 			true);
@@ -627,7 +632,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// initialize file system
 		// Copy the application master jar to the filesystem
 		// Create a local resource to point to the destination jar path
-		final FileSystem fs = FileSystem.get(conf);
+		final FileSystem fs = FileSystem.get(yarnConfiguration);
 		final Path homeDir = fs.getHomeDirectory();
 
 		// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
@@ -881,7 +886,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		if (UserGroupInformation.isSecurityEnabled()) {
 			// set HDFS delegation tokens when security is enabled
 			LOG.info("Adding delegation token to the AM container..");
-			Utils.setTokensFor(amContainer, paths, conf);
+			Utils.setTokensFor(amContainer, paths, yarnConfiguration);
 		}
 
 		amContainer.setLocalResources(localResources);
@@ -926,7 +931,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 
 		// set classpath from YARN configuration
-		Utils.setupYarnClassPath(conf, appMasterEnv);
+		Utils.setupYarnClassPath(yarnConfiguration, appMasterEnv);
 
 		amContainer.setEnvironment(appMasterEnv);
 
@@ -1196,7 +1201,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			ByteArrayOutputStream baos = new ByteArrayOutputStream();
 			PrintStream ps = new PrintStream(baos);
 
-			YarnClient yarnClient = getYarnClient();
 			YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
 
 			ps.append("NodeManagers in the ClusterClient " + metrics.getNumNodeManagers());
@@ -1223,7 +1227,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 				ps.println("Queue: " + q.getQueueName() + ", Current Capacity: " + q.getCurrentCapacity() + " Max Capacity: " +
 					q.getMaximumCapacity() + " Applications: " + q.getApplications().size());
 			}
-			yarnClient.stop();
 			return baos.toString();
 		} catch (Exception e) {
 			throw new RuntimeException("Couldn't get cluster description", e);
@@ -1411,7 +1414,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			failSessionDuringDeployment(yarnClient, yarnApplication);
 			LOG.info("Deleting files in {}.", yarnFilesDir);
 			try {
-				FileSystem fs = FileSystem.get(conf);
+				FileSystem fs = FileSystem.get(yarnConfiguration);
 
 				if (!fs.delete(yarnFilesDir, true)) {
 					throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful");
@@ -1419,7 +1422,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 				fs.close();
 			} catch (IOException e) {
-				LOG.error("Failed to delete Flink Jar and conf files in HDFS", e);
+				LOG.error("Failed to delete Flink Jar and configuration files in HDFS", e);
 			}
 		}
 	}
@@ -1525,7 +1528,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			AbstractYarnClusterDescriptor descriptor,
 			int numberTaskManagers,
 			int slotsPerTaskManager,
-			YarnClient yarnClient,
 			ApplicationReport report,
 			org.apache.flink.configuration.Configuration flinkConfiguration,
 			boolean perJobCluster) throws Exception {
@@ -1533,7 +1535,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			descriptor,
 			numberTaskManagers,
 			slotsPerTaskManager,
-			yarnClient,
 			report,
 			flinkConfiguration,
 			perJobCluster);

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index 80d0943..63421f9 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -43,7 +43,6 @@ import akka.util.Timeout;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,8 +66,6 @@ public class YarnClusterClient extends ClusterClient {
 
 	private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClient.class);
 
-	private YarnClient yarnClient;
-
 	private Thread clientShutdownHook = new ClientShutdownHook();
 
 	//---------- Class internal fields -------------------
@@ -93,7 +90,6 @@ public class YarnClusterClient extends ClusterClient {
 	 * @param clusterDescriptor The descriptor used at cluster creation
 	 * @param numberTaskManagers The number of task managers, -1 if unknown
 	 * @param slotsPerTaskManager Slots per task manager, -1 if unknown
-	 * @param yarnClient Client to talk to YARN
 	 * @param appReport the YARN application ID
 	 * @param flinkConfig Flink configuration
 	 * @param newlyCreatedCluster Indicator whether this cluster has just been created
@@ -104,7 +100,6 @@ public class YarnClusterClient extends ClusterClient {
 		final AbstractYarnClusterDescriptor clusterDescriptor,
 		final int numberTaskManagers,
 		final int slotsPerTaskManager,
-		final YarnClient yarnClient,
 		final ApplicationReport appReport,
 		Configuration flinkConfig,
 		boolean newlyCreatedCluster) throws Exception {
@@ -115,7 +110,6 @@ public class YarnClusterClient extends ClusterClient {
 		this.clusterDescriptor = clusterDescriptor;
 		this.numberTaskManagers = numberTaskManagers;
 		this.slotsPerTaskManager = slotsPerTaskManager;
-		this.yarnClient = yarnClient;
 		this.appReport = appReport;
 		this.appId = appReport.getApplicationId();
 		this.trackingURL = appReport.getTrackingUrl();
@@ -328,7 +322,7 @@ public class YarnClusterClient extends ClusterClient {
 			Future<Object> response =
 				Patterns.ask(applicationClient.get(),
 					new YarnMessages.LocalStopYarnSession(ApplicationStatus.CANCELED,
-							"Flink YARN Client requested shutdown"),
+						"Flink YARN Client requested shutdown"),
 					new Timeout(akkaDuration));
 			Await.ready(response, akkaDuration);
 		} catch (Exception e) {
@@ -349,7 +343,7 @@ public class YarnClusterClient extends ClusterClient {
 		}
 
 		try {
-			ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+			ApplicationReport appReport = clusterDescriptor.getYarnClient().getApplicationReport(appId);
 
 			LOG.info("Application " + appId + " finished with state " + appReport
 				.getYarnApplicationState() + " and final state " + appReport
@@ -368,10 +362,6 @@ public class YarnClusterClient extends ClusterClient {
 		} catch (Exception e) {
 			LOG.warn("Couldn't get final report", e);
 		}
-
-		LOG.info("YARN Client is shutting down");
-		yarnClient.stop(); // actorRunner is using the yarnClient.
-		yarnClient = null; // set null to clearly see if somebody wants to access it afterwards.
 	}
 
 	public boolean hasBeenShutdown() {

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index 8759c3e..76f9154 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -22,13 +22,18 @@ import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
+import org.apache.hadoop.yarn.client.api.YarnClient;
+
 /**
  * Default implementation of {@link AbstractYarnClusterDescriptor} which starts an {@link YarnApplicationMasterRunner}.
  */
 public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor {
 
-	public YarnClusterDescriptor(Configuration flinkConfiguration, String configurationDirectory) {
-		super(flinkConfiguration, configurationDirectory);
+	public YarnClusterDescriptor(
+			Configuration flinkConfiguration,
+			String configurationDirectory,
+			YarnClient yarnClient) {
+		super(flinkConfiguration, configurationDirectory, yarnClient);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
index ed04523..6ce192c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptorV2.java
@@ -22,14 +22,19 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint;
 import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
 
+import org.apache.hadoop.yarn.client.api.YarnClient;
+
 /**
  * Implementation of {@link org.apache.flink.yarn.AbstractYarnClusterDescriptor} which is used to start the
  * new application master for a job under flip-6.
  */
 public class YarnClusterDescriptorV2 extends AbstractYarnClusterDescriptor {
 
-	public YarnClusterDescriptorV2(Configuration flinkConfiguration, String configurationDirectory) {
-		super(flinkConfiguration, configurationDirectory);
+	public YarnClusterDescriptorV2(
+			Configuration flinkConfiguration,
+			String configurationDirectory,
+			YarnClient yarnCLient) {
+		super(flinkConfiguration, configurationDirectory, yarnCLient);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 5483758..c045082 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -50,6 +50,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -862,10 +863,11 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 	}
 
 	protected AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, String configurationDirectory, boolean flip6) {
+		final YarnClient yarnClient = YarnClient.createYarnClient();
 		if (flip6) {
-			return new YarnClusterDescriptorV2(configuration, configurationDirectory);
+			return new YarnClusterDescriptorV2(configuration, configurationDirectory, yarnClient);
 		} else {
-			return new YarnClusterDescriptor(configuration, configurationDirectory);
+			return new YarnClusterDescriptor(configuration, configurationDirectory, yarnClient);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/156b8935/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index c11c413..0d1bf65 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -59,9 +60,12 @@ public class YarnClusterDescriptorTest extends TestLogger {
 	@Test
 	public void testFailIfTaskSlotsHigherThanMaxVcores() {
 
+		final YarnClient yarnClient = YarnClient.createYarnClient();
+
 		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
 			new Configuration(),
-			temporaryFolder.getRoot().getAbsolutePath());
+			temporaryFolder.getRoot().getAbsolutePath(),
+			yarnClient);
 
 		clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
 
@@ -81,6 +85,8 @@ public class YarnClusterDescriptorTest extends TestLogger {
 			if (!(e.getCause() instanceof IllegalConfigurationException)) {
 				throw e;
 			}
+		} finally {
+			clusterDescriptor.close();
 		}
 	}
 
@@ -90,9 +96,12 @@ public class YarnClusterDescriptorTest extends TestLogger {
 		// overwrite vcores in config
 		configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE);
 
+		final YarnClient yarnClient = YarnClient.createYarnClient();
+
 		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
 			configuration,
-			temporaryFolder.getRoot().getAbsolutePath());
+			temporaryFolder.getRoot().getAbsolutePath(),
+			yarnClient);
 
 		clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
 
@@ -113,15 +122,19 @@ public class YarnClusterDescriptorTest extends TestLogger {
 			if (!(e.getCause() instanceof IllegalConfigurationException)) {
 				throw e;
 			}
+		} finally {
+			clusterDescriptor.close();
 		}
 	}
 
 	@Test
 	public void testSetupApplicationMasterContainer() {
 		Configuration cfg = new Configuration();
+		final YarnClient yarnClient = YarnClient.createYarnClient();
 		YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
 			cfg,
-			temporaryFolder.getRoot().getAbsolutePath());
+			temporaryFolder.getRoot().getAbsolutePath(),
+			yarnClient);
 
 		final String java = "$JAVA_HOME/bin/java";
 		final String jvmmem = "-Xmx424m";
@@ -142,219 +155,223 @@ public class YarnClusterDescriptorTest extends TestLogger {
 			"2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err";
 		final int jobManagerMemory = 1024;
 
-		// no logging, with/out krb5
-		assertEquals(
-			java + " " + jvmmem +
-				" " + // jvmOpts
-				" " + // logging
-				" " + mainClass + " " + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					false,
-					false,
-					false,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		assertEquals(
-			java + " " + jvmmem +
-				" " + " " + krb5 + // jvmOpts
-				" " + // logging
-				" " + mainClass + " " + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					false,
-					false,
-					true,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		// logback only, with/out krb5
-		assertEquals(
-			java + " " + jvmmem +
-				" " + // jvmOpts
-				" " + logfile + " " + logback +
-				" " + mainClass + " " + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					false,
-					false,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		assertEquals(
-			java + " " + jvmmem +
-				" " + " " + krb5 + // jvmOpts
-				" " + logfile + " " + logback +
-				" " + mainClass + " " + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					false,
-					true,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		// log4j, with/out krb5
-		assertEquals(
-			java + " " + jvmmem +
-				" " + // jvmOpts
-				" " + logfile + " " + log4j +
-				" " + mainClass + " " + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					false,
-					true,
-					false,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		assertEquals(
-			java + " " + jvmmem +
-				" " + " " + krb5 + // jvmOpts
-				" " + logfile + " " + log4j +
-				" " + mainClass + " " + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					false,
-					true,
-					true,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		// logback + log4j, with/out krb5
-		assertEquals(
-			java + " " + jvmmem +
-				" " + // jvmOpts
-				" " + logfile + " " + logback + " " + log4j +
-				" " + mainClass + " " + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					true,
-					false,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		assertEquals(
-			java + " " + jvmmem +
-				" " + " " + krb5 + // jvmOpts
-				" " + logfile + " " + logback + " " + log4j +
-				" " + mainClass + " " + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					true,
-					true,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		// logback + log4j, with/out krb5, different JVM opts
-		// IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor,
-		// because we have a reference to the ClusterDescriptor's configuration which we modify continuously
-		cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
-		assertEquals(
-			java + " " + jvmmem +
-				" " + jvmOpts +
-				" " + logfile + " " + logback + " " + log4j +
-				" " + mainClass + " "  + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					true,
-					false,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		assertEquals(
-			java + " " + jvmmem +
-				" " + jvmOpts + " " + krb5 + // jvmOpts
-				" " + logfile + " " + logback + " " + log4j +
-				" " + mainClass + " "  + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					true,
-					true,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		// logback + log4j, with/out krb5, different JVM opts
-		// IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor
-		cfg.setString(CoreOptions.FLINK_JM_JVM_OPTIONS, jmJvmOpts);
-		assertEquals(
-			java + " " + jvmmem +
-				" " + jvmOpts + " " + jmJvmOpts +
-				" " + logfile + " " + logback + " " + log4j +
-				" " + mainClass + " "  + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					true,
-					false,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		assertEquals(
-			java + " " + jvmmem +
-				" " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
-				" " + logfile + " " + logback + " " + log4j +
-				" " + mainClass + " "  + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					true,
-					true,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		// now try some configurations with different yarn.container-start-command-template
-		// IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor
-		cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
-			"%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 6 %redirects%");
-		assertEquals(
-			java + " 1 " + jvmmem +
-				" 2 " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
-				" 3 " + logfile + " " + logback + " " + log4j +
-				" 4 " + mainClass + " 5 " + args + " 6 " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					true,
-					true,
-					jobManagerMemory)
-				.getCommands().get(0));
-
-		cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
-			"%java% %logging% %jvmopts% %jvmmem% %class% %args% %redirects%");
-		// IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor
-		assertEquals(
-			java +
-				" " + logfile + " " + logback + " " + log4j +
-				" " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
-				" " + jvmmem +
-				" " + mainClass + " " + args + " " + redirects,
-			clusterDescriptor
-				.setupApplicationMasterContainer(
-					mainClass,
-					true,
-					true,
-					true,
-					jobManagerMemory)
-				.getCommands().get(0));
+		try {
+			// no logging, with/out krb5
+			assertEquals(
+				java + " " + jvmmem +
+					" " + // jvmOpts
+					" " + // logging
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						false,
+						false,
+						false,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			assertEquals(
+				java + " " + jvmmem +
+					" " + " " + krb5 + // jvmOpts
+					" " + // logging
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						false,
+						false,
+						true,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			// logback only, with/out krb5
+			assertEquals(
+				java + " " + jvmmem +
+					" " + // jvmOpts
+					" " + logfile + " " + logback +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						false,
+						false,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			assertEquals(
+				java + " " + jvmmem +
+					" " + " " + krb5 + // jvmOpts
+					" " + logfile + " " + logback +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						false,
+						true,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			// log4j, with/out krb5
+			assertEquals(
+				java + " " + jvmmem +
+					" " + // jvmOpts
+					" " + logfile + " " + log4j +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						false,
+						true,
+						false,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			assertEquals(
+				java + " " + jvmmem +
+					" " + " " + krb5 + // jvmOpts
+					" " + logfile + " " + log4j +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						false,
+						true,
+						true,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			// logback + log4j, with/out krb5
+			assertEquals(
+				java + " " + jvmmem +
+					" " + // jvmOpts
+					" " + logfile + " " + logback + " " + log4j +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						true,
+						false,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			assertEquals(
+				java + " " + jvmmem +
+					" " + " " + krb5 + // jvmOpts
+					" " + logfile + " " + logback + " " + log4j +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						true,
+						true,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			// logback + log4j, with/out krb5, different JVM opts
+			// IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor,
+			// because we have a reference to the ClusterDescriptor's configuration which we modify continuously
+			cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
+			assertEquals(
+				java + " " + jvmmem +
+					" " + jvmOpts +
+					" " + logfile + " " + logback + " " + log4j +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						true,
+						false,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			assertEquals(
+				java + " " + jvmmem +
+					" " + jvmOpts + " " + krb5 + // jvmOpts
+					" " + logfile + " " + logback + " " + log4j +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						true,
+						true,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			// logback + log4j, with/out krb5, different JVM opts
+			// IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor
+			cfg.setString(CoreOptions.FLINK_JM_JVM_OPTIONS, jmJvmOpts);
+			assertEquals(
+				java + " " + jvmmem +
+					" " + jvmOpts + " " + jmJvmOpts +
+					" " + logfile + " " + logback + " " + log4j +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						true,
+						false,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			assertEquals(
+				java + " " + jvmmem +
+					" " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
+					" " + logfile + " " + logback + " " + log4j +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						true,
+						true,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			// now try some configurations with different yarn.container-start-command-template
+			// IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor
+			cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
+				"%java% 1 %jvmmem% 2 %jvmopts% 3 %logging% 4 %class% 5 %args% 6 %redirects%");
+			assertEquals(
+				java + " 1 " + jvmmem +
+					" 2 " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
+					" 3 " + logfile + " " + logback + " " + log4j +
+					" 4 " + mainClass + " 5 " + args + " 6 " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						true,
+						true,
+						jobManagerMemory)
+					.getCommands().get(0));
+
+			cfg.setString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
+				"%java% %logging% %jvmopts% %jvmmem% %class% %args% %redirects%");
+			// IMPORTANT: Be aware that we are using side effects here to modify the created YarnClusterDescriptor
+			assertEquals(
+				java +
+					" " + logfile + " " + logback + " " + log4j +
+					" " + jvmOpts + " " + jmJvmOpts + " " + krb5 + // jvmOpts
+					" " + jvmmem +
+					" " + mainClass + " " + args + " " + redirects,
+				clusterDescriptor
+					.setupApplicationMasterContainer(
+						mainClass,
+						true,
+						true,
+						true,
+						jobManagerMemory)
+					.getCommands().get(0));
+		} finally {
+			clusterDescriptor.close();
+		}
 	}
 }


[4/5] flink git commit: [FLINK-8233][flip6] Add JobExecutionResultHandler

Posted by tr...@apache.org.
[FLINK-8233][flip6] Add JobExecutionResultHandler

    - Allow retrieval of the JobResult cached in Dispatcher.
    - Implement serializer and deserializer for JobResult.

[FLINK-8233][flip6] Improve JobResultDeserializer and add tests

[FLINK-8233][flip6] Exclude null jobExecutionResult from serialization

[FLINK-8233][flip6] Add TestLogger to JobResultDeserializerTest

This closes #5194.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86892b8e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86892b8e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86892b8e

Branch: refs/heads/master
Commit: 86892b8e76a4e4b26cedf38c0695c53814a7f04f
Parents: c2492e9
Author: gyao <ga...@data-artisans.com>
Authored: Wed Dec 20 14:44:03 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 11 17:01:33 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/util/SerializedThrowable.java  |  25 ++-
 .../handler/job/JobExecutionResultHandler.java  |  97 +++++++++++
 .../messages/job/JobExecutionResultHeaders.java |  71 ++++++++
 .../job/JobExecutionResultResponseBody.java     |  82 ++++++++++
 .../messages/json/JobResultDeserializer.java    | 160 ++++++++++++++++++
 .../rest/messages/json/JobResultSerializer.java | 101 ++++++++++++
 .../json/SerializedThrowableDeserializer.java   |  57 +++++++
 .../json/SerializedThrowableSerializer.java     |  55 +++++++
 .../json/SerializedValueDeserializer.java       |  46 ++++++
 .../json/SerializedValueSerializer.java         |  51 ++++++
 .../rest/messages/queue/QueueStatus.java        |  63 ++++++++
 .../runtime/rest/util/RestMapperUtils.java      |   3 +-
 .../runtime/webmonitor/WebMonitorEndpoint.java  |   9 ++
 .../job/JobExecutionResultHandlerTest.java      | 161 +++++++++++++++++++
 .../job/JobExecutionResultResponseBodyTest.java | 132 +++++++++++++++
 .../json/JobResultDeserializerTest.java         |  89 ++++++++++
 .../json/SerializedValueSerializerTest.java     |  82 ++++++++++
 17 files changed, 1280 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/86892b8e/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
index dab7cda..3cab55b 100644
--- a/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
+++ b/flink-core/src/main/java/org/apache/flink/util/SerializedThrowable.java
@@ -25,6 +25,8 @@ import java.lang.ref.WeakReference;
 import java.util.HashSet;
 import java.util.Set;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * Utility class for dealing with user-defined Throwable types that are serialized (for
  * example during RPC/Actor communication), but cannot be resolved with the default
@@ -47,13 +49,12 @@ public class SerializedThrowable extends Exception implements Serializable {
 	/** The original stack trace, to be printed */
 	private final String fullStringifiedStackTrace;
 
-	/** The original exception, not transported via serialization, 
+	/** The original exception, not transported via serialization,
 	 * because the class may not be part of the system class loader.
 	 * In addition, we make sure our cached references to not prevent
 	 * unloading the exception class. */
 	private transient WeakReference<Throwable> cachedException;
 
-
 	/**
 	 * Create a new SerializedThrowable.
 	 * 
@@ -63,6 +64,18 @@ public class SerializedThrowable extends Exception implements Serializable {
 		this(exception, new HashSet<Throwable>());
 	}
 
+	/**
+	 * Creates a new SerializedThrowable from a serialized exception provided as a byte array.
+	 */
+	public SerializedThrowable(
+			final byte[] serializedException,
+			final String originalErrorClassName,
+			final String fullStringifiedStackTrace) {
+		this.serializedException = requireNonNull(serializedException);
+		this.originalErrorClassName = requireNonNull(originalErrorClassName);
+		this.fullStringifiedStackTrace = requireNonNull(fullStringifiedStackTrace);
+	}
+
 	private SerializedThrowable(Throwable exception, Set<Throwable> alreadySeen) {
 		super(getMessageOrError(exception));
 
@@ -136,6 +149,14 @@ public class SerializedThrowable extends Exception implements Serializable {
 		return originalErrorClassName;
 	}
 
+	public byte[] getSerializedException() {
+		return serializedException;
+	}
+
+	public String getFullStringifiedStackTrace() {
+		return fullStringifiedStackTrace;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Override the behavior of Throwable
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/86892b8e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java
new file mode 100644
index 0000000..5b6154c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandler.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.JobExecutionResultGoneException;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/**
+ * Returns the {@link org.apache.flink.api.common.JobExecutionResult} for a given {@link JobID}.
+ */
+public class JobExecutionResultHandler
+	extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, JobExecutionResultResponseBody, JobMessageParameters> {
+
+	public JobExecutionResultHandler(
+			final CompletableFuture<String> localRestAddress,
+			final GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			final Time timeout,
+			final Map<String, String> responseHeaders) {
+		super(
+			localRestAddress,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			JobExecutionResultHeaders.getInstance());
+	}
+
+	@Override
+	protected CompletableFuture<JobExecutionResultResponseBody> handleRequest(
+			@Nonnull final HandlerRequest<EmptyRequestBody, JobMessageParameters> request,
+			@Nonnull final RestfulGateway gateway) throws RestHandlerException {
+
+		final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+		return gateway.isJobExecutionResultPresent(jobId, timeout).thenCompose(present -> {
+				if (!present) {
+					return CompletableFuture.completedFuture(
+						JobExecutionResultResponseBody.inProgress());
+				} else {
+					return gateway.getJobExecutionResult(jobId, timeout)
+						.thenApply(JobExecutionResultResponseBody::created);
+				}
+			}
+		).exceptionally(throwable -> {
+			throw propagateException(throwable);
+		});
+	}
+
+	private static CompletionException propagateException(final Throwable throwable) {
+		final Throwable cause = ExceptionUtils.stripCompletionException(throwable);
+
+		if (cause instanceof JobExecutionResultGoneException
+			|| cause instanceof FlinkJobNotFoundException) {
+			throw new CompletionException(new RestHandlerException(
+				throwable.getMessage(),
+				HttpResponseStatus.NOT_FOUND,
+				throwable));
+		} else {
+			throw new CompletionException(throwable);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86892b8e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultHeaders.java
new file mode 100644
index 0000000..a01a2d9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultHeaders.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * {@link MessageHeaders} for {@link JobExecutionResultHeaders}.
+ */
+public class JobExecutionResultHeaders
+	implements MessageHeaders<EmptyRequestBody, JobExecutionResultResponseBody, JobMessageParameters> {
+
+	private static final JobExecutionResultHeaders INSTANCE = new JobExecutionResultHeaders();
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<JobExecutionResultResponseBody> getResponseClass() {
+		return JobExecutionResultResponseBody.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public JobMessageParameters getUnresolvedMessageParameters() {
+		return new JobMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return "/jobs/:" + JobIDPathParameter.KEY + "/execution-result";
+	}
+
+	public static JobExecutionResultHeaders getInstance() {
+		return INSTANCE;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86892b8e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBody.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBody.java
new file mode 100644
index 0000000..76aab0f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBody.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobResultDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobResultSerializer;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link ResponseBody} that carries a {@link QueueStatus} and a {@link JobResult}.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class JobExecutionResultResponseBody implements ResponseBody {
+
+	@JsonProperty(value = "status", required = true)
+	private final QueueStatus status;
+
+	@JsonProperty(value = "job-execution-result")
+	@JsonSerialize(using = JobResultSerializer.class)
+	@JsonDeserialize(using = JobResultDeserializer.class)
+	@Nullable
+	private final JobResult jobExecutionResult;
+
+	@JsonCreator
+	public JobExecutionResultResponseBody(
+			@JsonProperty(value = "status", required = true) final QueueStatus status,
+			@JsonProperty(value = "job-execution-result")
+			@JsonDeserialize(using = JobResultDeserializer.class)
+			@Nullable final JobResult jobExecutionResult) {
+		this.status = requireNonNull(status);
+		this.jobExecutionResult = jobExecutionResult;
+	}
+
+	public static JobExecutionResultResponseBody inProgress() {
+		return new JobExecutionResultResponseBody(QueueStatus.inProgress(), null);
+	}
+
+	public static JobExecutionResultResponseBody created(
+			final JobResult jobExecutionResult) {
+		return new JobExecutionResultResponseBody(QueueStatus.completed(), jobExecutionResult);
+	}
+
+	public QueueStatus getStatus() {
+		return status;
+	}
+
+	@Nullable
+	public JobResult getJobExecutionResult() {
+		return jobExecutionResult;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86892b8e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
new file mode 100644
index 0000000..52bb43c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonToken;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * JSON deserializer for {@link JobResult}.
+ *
+ * @see JobResultSerializer
+ */
+public class JobResultDeserializer extends StdDeserializer<JobResult> {
+
+	private static final long serialVersionUID = 1L;
+
+	private final JobIDDeserializer jobIdDeserializer = new JobIDDeserializer();
+
+	private final SerializedThrowableDeserializer serializedThrowableDeserializer =
+		new SerializedThrowableDeserializer();
+
+	private final SerializedValueDeserializer serializedValueDeserializer;
+
+	public JobResultDeserializer() {
+		super(JobResult.class);
+		final JavaType objectSerializedValueType = TypeFactory.defaultInstance()
+			.constructType(new TypeReference<SerializedValue<Object>>() {
+			});
+		serializedValueDeserializer = new SerializedValueDeserializer(objectSerializedValueType);
+	}
+
+	@Override
+	public JobResult deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException {
+		JobID jobId = null;
+		long netRuntime = -1;
+		SerializedThrowable serializedThrowable = null;
+		Map<String, SerializedValue<Object>> accumulatorResults = null;
+
+		while (true) {
+			final JsonToken jsonToken = p.nextToken();
+			assertNotEndOfInput(p, jsonToken);
+			if (jsonToken == JsonToken.END_OBJECT) {
+				break;
+			}
+
+			final String fieldName = p.getValueAsString();
+			switch (fieldName) {
+				case JobResultSerializer.FIELD_NAME_JOB_ID:
+					assertNextToken(p, JsonToken.VALUE_STRING);
+					jobId = jobIdDeserializer.deserialize(p, ctxt);
+					break;
+				case JobResultSerializer.FIELD_NAME_NET_RUNTIME:
+					assertNextToken(p, JsonToken.VALUE_NUMBER_INT);
+					netRuntime = p.getLongValue();
+					break;
+				case JobResultSerializer.FIELD_NAME_ACCUMULATOR_RESULTS:
+					assertNextToken(p, JsonToken.START_OBJECT);
+					accumulatorResults = parseAccumulatorResults(p, ctxt);
+					break;
+				case JobResultSerializer.FIELD_NAME_FAILURE_CAUSE:
+					assertNextToken(p, JsonToken.START_OBJECT);
+					serializedThrowable = serializedThrowableDeserializer.deserialize(p, ctxt);
+					break;
+				default:
+					// ignore unknown fields
+			}
+		}
+
+		try {
+			return new JobResult.Builder()
+				.jobId(jobId)
+				.netRuntime(netRuntime)
+				.accumulatorResults(accumulatorResults)
+				.serializedThrowable(serializedThrowable)
+				.build();
+		} catch (final RuntimeException e) {
+			throw new JsonMappingException(
+				null,
+				"Could not deserialize " + JobResult.class.getSimpleName(),
+				e);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private Map<String, SerializedValue<Object>> parseAccumulatorResults(
+			final JsonParser p,
+			final DeserializationContext ctxt) throws IOException {
+
+		final Map<String, SerializedValue<Object>> accumulatorResults = new HashMap<>();
+		while (true) {
+			final JsonToken jsonToken = p.nextToken();
+			assertNotEndOfInput(p, jsonToken);
+			if (jsonToken == JsonToken.END_OBJECT) {
+				break;
+			}
+			final String accumulatorName = p.getValueAsString();
+			p.nextValue();
+			accumulatorResults.put(
+				accumulatorName,
+				(SerializedValue<Object>) serializedValueDeserializer.deserialize(p, ctxt));
+		}
+		return accumulatorResults;
+	}
+
+	/**
+	 * Asserts that the provided JsonToken is not null, i.e., not at the end of the input.
+	 */
+	private static void assertNotEndOfInput(
+			final JsonParser p,
+			@Nullable final JsonToken jsonToken) {
+		checkState(jsonToken != null, "Unexpected end of input at %s", p.getCurrentLocation());
+	}
+
+	/**
+	 * Advances the token and asserts that it matches the required {@link JsonToken}.
+	 */
+	private static void assertNextToken(
+			final JsonParser p,
+			final JsonToken requiredJsonToken) throws IOException {
+		final JsonToken jsonToken = p.nextToken();
+		if (jsonToken != requiredJsonToken) {
+			throw new JsonMappingException(p, String.format("Expected token %s (was %s)", requiredJsonToken, jsonToken));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86892b8e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
new file mode 100644
index 0000000..a53716a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * JSON serializer for {@link JobResult}.
+ *
+ * @see JobResultDeserializer
+ */
+public class JobResultSerializer extends StdSerializer<JobResult> {
+
+	private static final long serialVersionUID = 1L;
+
+	static final String FIELD_NAME_JOB_ID = "id";
+
+	static final String FIELD_NAME_NET_RUNTIME = "net-runtime";
+
+	static final String FIELD_NAME_ACCUMULATOR_RESULTS = "accumulator-results";
+
+	static final String FIELD_NAME_FAILURE_CAUSE = "failure-cause";
+
+	private final JobIDSerializer jobIdSerializer = new JobIDSerializer();
+
+	private final SerializedValueSerializer serializedValueSerializer;
+
+	private final SerializedThrowableSerializer serializedThrowableSerializer = new SerializedThrowableSerializer();
+
+	public JobResultSerializer() {
+		super(JobResult.class);
+
+		final JavaType objectSerializedValueType = TypeFactory.defaultInstance()
+			.constructType(new TypeReference<SerializedValue<Object>>() {
+			});
+		serializedValueSerializer = new SerializedValueSerializer(objectSerializedValueType);
+	}
+
+	@Override
+	public void serialize(
+			final JobResult result,
+			final JsonGenerator gen,
+			final SerializerProvider provider) throws IOException {
+
+		gen.writeStartObject();
+
+		gen.writeFieldName(FIELD_NAME_JOB_ID);
+		jobIdSerializer.serialize(result.getJobId(), gen, provider);
+
+		gen.writeFieldName(FIELD_NAME_ACCUMULATOR_RESULTS);
+		gen.writeStartObject();
+		final Map<String, SerializedValue<Object>> accumulatorResults = result.getAccumulatorResults();
+		for (final Map.Entry<String, SerializedValue<Object>> nameValue : accumulatorResults.entrySet()) {
+			final String name = nameValue.getKey();
+			final SerializedValue<Object> value = nameValue.getValue();
+
+			gen.writeFieldName(name);
+			serializedValueSerializer.serialize(value, gen, provider);
+		}
+		gen.writeEndObject();
+
+		gen.writeNumberField(FIELD_NAME_NET_RUNTIME, result.getNetRuntime());
+
+		if (result.getSerializedThrowable().isPresent()) {
+			gen.writeFieldName(FIELD_NAME_FAILURE_CAUSE);
+
+			final SerializedThrowable serializedThrowable = result.getSerializedThrowable().get();
+			serializedThrowableSerializer.serialize(serializedThrowable, gen, provider);
+		}
+
+		gen.writeEndObject();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86892b8e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java
new file mode 100644
index 0000000..3217cce
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableDeserializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.util.SerializedThrowable;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+import static org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_CLASS;
+import static org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_SERIALIZED_EXCEPTION;
+import static org.apache.flink.runtime.rest.messages.json.SerializedThrowableSerializer.FIELD_NAME_STACK_TRACE;
+
+/**
+ * JSON deserializer for {@link SerializedThrowable}.
+ */
+public class SerializedThrowableDeserializer extends StdDeserializer<SerializedThrowable> {
+
+	private static final long serialVersionUID = 1L;
+
+	public SerializedThrowableDeserializer() {
+		super(SerializedThrowable.class);
+	}
+
+	@Override
+	public SerializedThrowable deserialize(
+			final JsonParser p,
+			final DeserializationContext ctxt) throws IOException {
+		final JsonNode root = p.readValueAsTree();
+
+		final String exceptionClassName = root.get(FIELD_NAME_CLASS).asText();
+		final String stackTrace = root.get(FIELD_NAME_STACK_TRACE).asText();
+		final byte[] serializedException = root.get(FIELD_NAME_SERIALIZED_EXCEPTION).binaryValue();
+		return new SerializedThrowable(serializedException, exceptionClassName, stackTrace);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86892b8e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializer.java
new file mode 100644
index 0000000..cb921a9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedThrowableSerializer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.util.SerializedThrowable;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * JSON serializer for {@link SerializedThrowable}.
+ */
+public class SerializedThrowableSerializer extends StdSerializer<SerializedThrowable> {
+
+	private static final long serialVersionUID = 1L;
+
+	static final String FIELD_NAME_SERIALIZED_EXCEPTION = "serialized-exception";
+
+	static final String FIELD_NAME_CLASS = "class";
+
+	static final String FIELD_NAME_STACK_TRACE = "stack-trace";
+
+	public SerializedThrowableSerializer() {
+		super(SerializedThrowable.class);
+	}
+
+	@Override
+	public void serialize(final SerializedThrowable value, final JsonGenerator gen, final SerializerProvider provider) throws IOException {
+		gen.writeStartObject();
+		gen.writeStringField(FIELD_NAME_CLASS, value.getOriginalErrorClassName());
+		gen.writeStringField(FIELD_NAME_STACK_TRACE, value.getFullStringifiedStackTrace());
+		gen.writeBinaryField(FIELD_NAME_SERIALIZED_EXCEPTION, value.getSerializedException());
+		gen.writeEndObject();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86892b8e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java
new file mode 100644
index 0000000..6a2eadb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+/**
+ * JSON deserializer for {@link SerializedValue}.
+ */
+public class SerializedValueDeserializer extends StdDeserializer<SerializedValue<?>> {
+
+	private static final long serialVersionUID = 1L;
+
+	public SerializedValueDeserializer(final JavaType valueType) {
+		super(valueType);
+	}
+
+	@Override
+	public SerializedValue<?> deserialize(final JsonParser p, final DeserializationContext ctxt) throws IOException {
+		return SerializedValue.fromBytes(p.getBinaryValue());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86892b8e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java
new file mode 100644
index 0000000..0383d99
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * JSON serializer for {@link SerializedValue}.
+ *
+ * <p>{@link SerializedValue}'s byte array will be base64 encoded.
+ */
+public class SerializedValueSerializer extends StdSerializer<SerializedValue<?>> {
+
+	private static final long serialVersionUID = 1L;
+
+	public SerializedValueSerializer(final JavaType javaType) {
+		super(javaType);
+	}
+
+	@Override
+	public void serialize(
+			final SerializedValue<?> value,
+			final JsonGenerator gen,
+			final SerializerProvider provider) throws IOException {
+		gen.writeBinary(value.getByteArray());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86892b8e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/queue/QueueStatus.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/queue/QueueStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/queue/QueueStatus.java
new file mode 100644
index 0000000..99aa571
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/queue/QueueStatus.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.queue;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Response type for temporary queue resources, i.e., resources that are asynchronously created.
+ *
+ * @see org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler
+ */
+public class QueueStatus {
+
+	private static final String FIELD_NAME_STATUS = "id";
+
+	@JsonProperty(value = FIELD_NAME_STATUS, required = true)
+	private final StatusId statusId;
+
+	@JsonCreator
+	public QueueStatus(
+		@JsonProperty(value = FIELD_NAME_STATUS, required = true) final StatusId statusId) {
+		this.statusId = requireNonNull(statusId, "statusId must not be null");
+	}
+
+	public static QueueStatus inProgress() {
+		return new QueueStatus(StatusId.IN_PROGRESS);
+	}
+
+	public static QueueStatus completed() {
+		return new QueueStatus(StatusId.COMPLETED);
+	}
+
+	public StatusId getStatusId() {
+		return statusId;
+	}
+
+	/**
+	 * Defines queue statuses.
+	 */
+	public enum StatusId {
+		IN_PROGRESS,
+		COMPLETED
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86892b8e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
index 2412ccc..81b2507 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/util/RestMapperUtils.java
@@ -33,8 +33,7 @@ public class RestMapperUtils {
 		objectMapper.enable(
 			DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
 			DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES,
-			DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY,
-			DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES);
+			DeserializationFeature.FAIL_ON_READING_DUP_TREE_KEY);
 		objectMapper.disable(
 			SerializationFeature.FAIL_ON_EMPTY_BEANS);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/86892b8e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 703a754..a41f0fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler;
 import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
@@ -70,6 +71,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDet
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders;
@@ -338,6 +340,12 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 			responseHeaders,
 			metricFetcher);
 
+		final JobExecutionResultHandler jobExecutionResultHandler = new JobExecutionResultHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders);
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<T>> optWebContent;
@@ -376,6 +384,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		handlers.add(Tuple2.of(SubtaskMetricsHeaders.getInstance(), subtaskMetricsHandler));
 		handlers.add(Tuple2.of(TaskManagerMetricsHeaders.getInstance(), taskManagerMetricsHandler));
 		handlers.add(Tuple2.of(JobManagerMetricsHeaders.getInstance(), jobManagerMetricsHandler));
+		handlers.add(Tuple2.of(JobExecutionResultHeaders.getInstance(), jobExecutionResultHandler));
 
 		// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
 		optWebContent.ifPresent(

http://git-wip-us.apache.org/repos/asf/flink/blob/86892b8e/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java
new file mode 100644
index 0000000..5bee78a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExecutionResultHandlerTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.JobExecutionResultGoneException;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link JobExecutionResultHandler}.
+ */
+public class JobExecutionResultHandlerTest extends TestLogger {
+
+	private static final JobID TEST_JOB_ID = new JobID();
+
+	private JobExecutionResultHandler jobExecutionResultHandler;
+
+	@Mock
+	private RestfulGateway mockRestfulGateway;
+
+	private HandlerRequest<EmptyRequestBody, JobMessageParameters> testRequest;
+
+	@Before
+	public void setUp() throws Exception {
+		MockitoAnnotations.initMocks(this);
+
+		jobExecutionResultHandler = new JobExecutionResultHandler(
+			CompletableFuture.completedFuture("localhost:12345"),
+			new GatewayRetriever<RestfulGateway>() {
+				@Override
+				public CompletableFuture<RestfulGateway> getFuture() {
+					return CompletableFuture.completedFuture(mockRestfulGateway);
+				}
+			},
+			Time.seconds(10),
+			Collections.emptyMap());
+
+		testRequest = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			new JobMessageParameters(),
+			Collections.singletonMap("jobid", TEST_JOB_ID.toString()),
+			Collections.emptyMap());
+	}
+
+	@Test
+	public void testResultInProgress() throws Exception {
+		when(mockRestfulGateway.isJobExecutionResultPresent(any(JobID.class), any(Time.class)))
+			.thenReturn(CompletableFuture.completedFuture(false));
+
+		final JobExecutionResultResponseBody responseBody = jobExecutionResultHandler.handleRequest(
+			testRequest,
+			mockRestfulGateway).get();
+
+		assertThat(
+			responseBody.getStatus().getStatusId(),
+			equalTo(QueueStatus.StatusId.IN_PROGRESS));
+	}
+
+	@Test
+	public void testCompletedResult() throws Exception {
+		when(mockRestfulGateway.isJobExecutionResultPresent(any(JobID.class), any(Time.class)))
+			.thenReturn(CompletableFuture.completedFuture(true));
+
+		when(mockRestfulGateway.getJobExecutionResult(any(JobID.class), any(Time.class)))
+			.thenReturn(CompletableFuture.completedFuture(new JobResult.Builder()
+				.jobId(TEST_JOB_ID)
+				.netRuntime(Long.MAX_VALUE)
+				.build()));
+
+		final JobExecutionResultResponseBody responseBody = jobExecutionResultHandler.handleRequest(
+			testRequest,
+			mockRestfulGateway).get();
+
+		assertThat(
+			responseBody.getStatus().getStatusId(),
+			equalTo(QueueStatus.StatusId.COMPLETED));
+		assertThat(responseBody.getJobExecutionResult(), not(nullValue()));
+	}
+
+	@Test
+	public void testPropagateFlinkJobNotFoundExceptionAsRestHandlerException() throws Exception {
+		assertPropagateAsRestHandlerException(
+			new CompletionException(new FlinkJobNotFoundException(new JobID())));
+	}
+
+	@Test
+	public void testPropagateJobExecutionResultGoneExceptionAsRestHandlerException() throws Exception {
+		assertPropagateAsRestHandlerException(
+			new CompletionException(new JobExecutionResultGoneException(new JobID())));
+	}
+
+	private void assertPropagateAsRestHandlerException(final Exception exception) throws Exception {
+		when(mockRestfulGateway.isJobExecutionResultPresent(any(JobID.class), any(Time.class)))
+			.thenReturn(FutureUtils.completedExceptionally(
+				exception));
+
+		try {
+			jobExecutionResultHandler.handleRequest(
+				testRequest,
+				mockRestfulGateway).get();
+			fail("Expected exception not thrown");
+		} catch (final ExecutionException e) {
+			final Throwable cause = ExceptionUtils.stripCompletionException(e.getCause());
+			assertThat(cause, instanceOf(RestHandlerException.class));
+			assertThat(
+				((RestHandlerException) cause).getHttpResponseStatus(),
+				equalTo(HttpResponseStatus.NOT_FOUND));
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86892b8e/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
new file mode 100644
index 0000000..d7dd7eb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test for {@link JobExecutionResultResponseBody}.
+ */
+@RunWith(Parameterized.class)
+public class JobExecutionResultResponseBodyTest
+	extends RestResponseMarshallingTestBase<JobExecutionResultResponseBody> {
+
+	private static final JobID TEST_JOB_ID = new JobID();
+
+	private static final long TEST_NET_RUNTIME = Long.MAX_VALUE;
+
+	private static final byte[] TEST_ACCUMULATOR_VALUE = {1, 2, 3, 4, 5};
+
+	private static final String TEST_ACCUMULATOR_NAME = "test";
+
+	private static final Map<String, SerializedValue<Object>> TEST_ACCUMULATORS = Collections.singletonMap(
+		TEST_ACCUMULATOR_NAME,
+		SerializedValue.fromBytes(TEST_ACCUMULATOR_VALUE));
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> data() throws IOException {
+		return Arrays.asList(new Object[][] {
+			{JobExecutionResultResponseBody.created(new JobResult.Builder()
+				.jobId(TEST_JOB_ID)
+				.netRuntime(TEST_NET_RUNTIME)
+				.accumulatorResults(TEST_ACCUMULATORS)
+				.serializedThrowable(new SerializedThrowable(new RuntimeException("expected")))
+				.build())},
+			{JobExecutionResultResponseBody.created(new JobResult.Builder()
+				.jobId(TEST_JOB_ID)
+				.netRuntime(TEST_NET_RUNTIME)
+				.accumulatorResults(TEST_ACCUMULATORS)
+				.build())},
+			{JobExecutionResultResponseBody.inProgress()}
+		});
+	}
+
+	private final JobExecutionResultResponseBody jobExecutionResultResponseBody;
+
+	public JobExecutionResultResponseBodyTest(
+			final JobExecutionResultResponseBody jobExecutionResultResponseBody) {
+		this.jobExecutionResultResponseBody = jobExecutionResultResponseBody;
+	}
+
+	@Override
+	protected Class<JobExecutionResultResponseBody> getTestResponseClass() {
+		return JobExecutionResultResponseBody.class;
+	}
+
+	@Override
+	protected JobExecutionResultResponseBody getTestResponseInstance() throws Exception {
+		return jobExecutionResultResponseBody;
+	}
+
+	@Override
+	protected void assertOriginalEqualsToUnmarshalled(
+			final JobExecutionResultResponseBody expected,
+			final JobExecutionResultResponseBody actual) {
+
+		assertThat(actual.getStatus(), equalTo(actual.getStatus()));
+
+		final JobResult expectedJobExecutionResult = expected.getJobExecutionResult();
+		final JobResult actualJobExecutionResult = actual.getJobExecutionResult();
+
+		if (expectedJobExecutionResult != null) {
+			assertNotNull(actualJobExecutionResult);
+
+			assertThat(actualJobExecutionResult.getJobId(), equalTo(expectedJobExecutionResult.getJobId()));
+			assertThat(actualJobExecutionResult.getNetRuntime(), equalTo(expectedJobExecutionResult.getNetRuntime()));
+			assertThat(actualJobExecutionResult.getAccumulatorResults(), equalTo(expectedJobExecutionResult.getAccumulatorResults()));
+
+			final Optional<SerializedThrowable> expectedFailureCauseOptional = expectedJobExecutionResult.getSerializedThrowable();
+			expectedFailureCauseOptional.ifPresent(expectedFailureCause -> {
+				final SerializedThrowable actualFailureCause = actualJobExecutionResult.getSerializedThrowable()
+					.orElseThrow(() -> new AssertionError("actualFailureCause is not available"));
+				assertThat(actualFailureCause.getFullStringifiedStackTrace(), equalTo(expectedFailureCause.getFullStringifiedStackTrace()));
+				assertThat(actualFailureCause.getOriginalErrorClassName(), equalTo(expectedFailureCause.getOriginalErrorClassName()));
+				assertArrayEquals(expectedFailureCause.getSerializedException(), actualFailureCause.getSerializedException());
+			});
+
+			if (expectedJobExecutionResult.getAccumulatorResults() != null) {
+				assertNotNull(actualJobExecutionResult.getAccumulatorResults());
+				assertArrayEquals(
+					actualJobExecutionResult.getAccumulatorResults().get(TEST_ACCUMULATOR_NAME).getByteArray(),
+					expectedJobExecutionResult.getAccumulatorResults().get(TEST_ACCUMULATOR_NAME).getByteArray());
+			}
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86892b8e/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java
new file mode 100644
index 0000000..e083baa
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializerTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link JobResultDeserializer}.
+ */
+public class JobResultDeserializerTest extends TestLogger {
+
+	private ObjectMapper objectMapper;
+
+	@Before
+	public void setUp() {
+		final SimpleModule simpleModule = new SimpleModule();
+		simpleModule.addDeserializer(JobResult.class, new JobResultDeserializer());
+
+		objectMapper = new ObjectMapper();
+		objectMapper.registerModule(simpleModule);
+	}
+
+	@Test
+	public void testDeserialization() throws Exception {
+		final JobResult jobResult = objectMapper.readValue("{\n" +
+			"\t\"id\": \"1bb5e8c7df49938733b7c6a73678de6a\",\n" +
+			"\t\"accumulator-results\": {},\n" +
+			"\t\"net-runtime\": 0,\n" +
+			"\t\"unknownfield\": \"foobar\"\n" +
+			"}", JobResult.class);
+
+		assertThat(jobResult.getJobId(), equalTo(JobID.fromHexString("1bb5e8c7df49938733b7c6a73678de6a")));
+		assertThat(jobResult.getNetRuntime(), equalTo(0L));
+		assertThat(jobResult.getAccumulatorResults().size(), equalTo(0));
+		assertThat(jobResult.getSerializedThrowable().isPresent(), equalTo(false));
+	}
+
+	@Test
+	public void testInvalidType() throws Exception {
+		try {
+			objectMapper.readValue("{\n" +
+				"\t\"id\": \"1bb5e8c7df49938733b7c6a73678de6a\",\n" +
+				"\t\"net-runtime\": \"invalid\"\n" +
+				"}", JobResult.class);
+		} catch (final JsonMappingException e) {
+			assertThat(e.getMessage(), containsString("Expected token VALUE_NUMBER_INT (was VALUE_STRING)"));
+		}
+	}
+
+	@Test
+	public void testIncompleteJobResult() throws Exception {
+		try {
+			objectMapper.readValue("{\n" +
+				"\t\"id\": \"1bb5e8c7df49938733b7c6a73678de6a\"\n" +
+				"}", JobResult.class);
+		} catch (final JsonMappingException e) {
+			assertThat(e.getMessage(), containsString("Could not deserialize JobResult"));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86892b8e/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java
new file mode 100644
index 0000000..87450dd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link SerializedValueSerializer} and {@link SerializedValueDeserializer}.
+ */
+public class SerializedValueSerializerTest extends TestLogger {
+
+	private ObjectMapper objectMapper;
+
+	@Before
+	public void setUp() {
+		objectMapper = new ObjectMapper();
+		final SimpleModule simpleModule = new SimpleModule();
+		final JavaType serializedValueWildcardType = objectMapper
+			.getTypeFactory()
+			.constructType(new TypeReference<SerializedValue<?>>() {
+			});
+		simpleModule.addSerializer(new SerializedValueSerializer(serializedValueWildcardType));
+		simpleModule.addDeserializer(
+			SerializedValue.class,
+			new SerializedValueDeserializer(serializedValueWildcardType));
+		objectMapper.registerModule(simpleModule);
+	}
+
+	@Test
+	public void testSerializationDeserialization() throws Exception {
+		final String json = objectMapper.writeValueAsString(new SerializedValue<>(new TestClass()));
+
+		final SerializedValue<TestClass> serializedValue =
+			objectMapper.readValue(json, new TypeReference<SerializedValue<TestClass>>() {
+			});
+		final TestClass deserializedValue =
+			serializedValue.deserializeValue(ClassLoader.getSystemClassLoader());
+
+		assertEquals("baz", deserializedValue.foo);
+		assertEquals(1, deserializedValue.bar);
+	}
+
+	private static class TestClass implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		private String foo = "baz";
+
+		private int bar = 1;
+
+	}
+
+}