You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/11 11:18:46 UTC

[1/4] flink git commit: [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

Repository: flink
Updated Branches:
  refs/heads/master 6c078c0e5 -> 2ee4d06ac


[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

- Introduce new JobExecutionResult used by JobMaster to forward the information in
  the already existing JobExecutionResult.
- Always cache a JobExecutionResult. Even in case of job failures. In case of
  job failures, the serialized exception is stored additionally.
- Introduce new methods to RestfulGateway to allow retrieval of cached
  JobExecutionResults

[FLINK-8234][flip6] Rename JobExecutionResult -> JobResult

[FLINK-8234][flip6] Update MiniClusterJobDispatcher

Do not store job failure exception in a separate field because the JobResult
already contains the exception.

[FLINK-8234][flip6] Make JobResult Serializable

[FLINK-8234][flip6] Add Javadoc to JobResult builder

[FLINK-8234][flip6] Add Javadoc to JobResult#serializedThrowable

[FLINK-8234][flip6] Wrap JobResults in SoftReferences

Wrap instances of JobResult stored in JobExecutionResultCache in SoftReferences
so that the GC can free them according to memory demand.

[FLINK-8234][flip6] Fix checkstyle violations

[FLINK-8234][flip6] Add Javadoc to JobResult

This closes #5184.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08e550c9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08e550c9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08e550c9

Branch: refs/heads/master
Commit: 08e550c98674738ab883ea84c0350093c9765ab6
Parents: 8353123
Author: gyao <ga...@data-artisans.com>
Authored: Tue Dec 19 18:58:53 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 11 12:12:50 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |  71 +++++++---
 .../dispatcher/JobExecutionResultCache.java     |  92 ++++++++++++
 .../entrypoint/JobClusterEntrypoint.java        |  12 +-
 .../runtime/jobmanager/OnCompletionActions.java |  10 +-
 .../runtime/jobmaster/JobManagerRunner.java     |   7 +-
 .../flink/runtime/jobmaster/JobMaster.java      |  24 ++--
 .../flink/runtime/jobmaster/JobResult.java      | 141 +++++++++++++++++++
 .../JobExecutionResultGoneException.java        |  36 +++++
 .../minicluster/MiniClusterJobDispatcher.java   |  41 ++++--
 .../runtime/webmonitor/RestfulGateway.java      |  45 ++++++
 .../runtime/dispatcher/DispatcherTest.java      |  67 +++++++++
 .../dispatcher/JobExecutionResultCacheTest.java |  93 ++++++++++++
 .../jobmaster/JobManagerRunnerMockTest.java     |  29 ++--
 .../flink/runtime/jobmaster/JobMasterTest.java  |   5 +-
 .../flink/runtime/jobmaster/JobResultTest.java  |  72 ++++++++++
 15 files changed, 680 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index ea3a6ad..299b315 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -39,10 +38,12 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.JobExecutionResultGoneException;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -60,6 +61,7 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.lang.ref.SoftReference;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -97,6 +99,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	private final LeaderElectionService leaderElectionService;
 
+	private final JobExecutionResultCache jobExecutionResultCache = new JobExecutionResultCache();
+
 	@Nullable
 	protected final String restAddress;
 
@@ -357,6 +361,36 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort());
 	}
 
+	@Override
+	public CompletableFuture<JobResult> getJobExecutionResult(
+			final JobID jobId,
+			final Time timeout) {
+
+		final SoftReference<JobResult> jobResultRef = jobExecutionResultCache.get(jobId);
+		if (jobResultRef == null) {
+			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
+		} else {
+			final JobResult jobResult = jobResultRef.get();
+			if (jobResult == null) {
+				return FutureUtils.completedExceptionally(new JobExecutionResultGoneException(jobId));
+			} else {
+				return CompletableFuture.completedFuture(jobResult);
+			}
+		}
+	}
+
+	@Override
+	public CompletableFuture<Boolean> isJobExecutionResultPresent(
+			final JobID jobId,
+			final Time timeout) {
+
+		final boolean jobExecutionResultPresent = jobExecutionResultCache.contains(jobId);
+		if (!jobManagerRunners.containsKey(jobId) && !jobExecutionResultPresent) {
+			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
+		}
+		return CompletableFuture.completedFuture(jobExecutionResultPresent);
+	}
+
 	/**
 	 * Cleans up the job related data from the dispatcher. If cleanupHA is true, then
 	 * the data will also be removed from HA.
@@ -549,38 +583,41 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	// Utility classes
 	//------------------------------------------------------
 
-	private class DispatcherOnCompleteActions implements OnCompletionActions {
+	@VisibleForTesting
+	class DispatcherOnCompleteActions implements OnCompletionActions {
 
 		private final JobID jobId;
 
-		private DispatcherOnCompleteActions(JobID jobId) {
+		DispatcherOnCompleteActions(JobID jobId) {
 			this.jobId = Preconditions.checkNotNull(jobId);
 		}
 
 		@Override
-		public void jobFinished(JobExecutionResult result) {
+		public void jobFinished(JobResult result) {
 			log.info("Job {} finished.", jobId);
 
 			runAsync(() -> {
-					try {
-						removeJob(jobId, true);
-					} catch (Exception e) {
-						log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
-					}
-				});
+				jobExecutionResultCache.put(result);
+				try {
+					removeJob(jobId, true);
+				} catch (Exception e) {
+					log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
+				}
+			});
 		}
 
 		@Override
-		public void jobFailed(Throwable cause) {
+		public void jobFailed(JobResult result) {
 			log.info("Job {} failed.", jobId);
 
 			runAsync(() -> {
-					try {
-						removeJob(jobId, true);
-					} catch (Exception e) {
-						log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
-					}
-				});
+				jobExecutionResultCache.put(result);
+				try {
+					removeJob(jobId, true);
+				} catch (Exception e) {
+					log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
+				}
+			});
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
new file mode 100644
index 0000000..6d3dc55
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.lang.ref.SoftReference;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Caches {@link JobResult}s by their job id.
+ *
+ * <p>Entries are cached for a finite time. However, the JobResults are wrapped in
+ * {@link SoftReference}s so that the GC can free them according to memory demand.
+ */
+class JobExecutionResultCache {
+
+	private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
+
+	private final Cache<JobID, SoftReference<JobResult>>
+		jobExecutionResultCache =
+		CacheBuilder.newBuilder()
+			.expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, TimeUnit.SECONDS)
+			.build();
+
+	/**
+	 * Adds a {@link JobResult} to the cache.
+	 *
+	 * @param result The entry to be added to the cache.
+	 */
+	public void put(final JobResult result) {
+		assertJobExecutionResultNotCached(result.getJobId());
+		jobExecutionResultCache.put(result.getJobId(), new SoftReference<>(result));
+	}
+
+	/**
+	 * Returns {@code true} if the cache contains a {@link JobResult} for the specified
+	 * {@link JobID}.
+	 *
+	 * @param jobId The job id for which the presence of the {@link JobResult} should be tested.
+	 * @return {@code true} if the cache contains an entry, {@code false} otherwise
+	 */
+	public boolean contains(final JobID jobId) {
+		return jobExecutionResultCache.getIfPresent(jobId) != null;
+	}
+
+	/**
+	 * Returns a {@link SoftReference} to the {@link JobResult} for the specified job, and removes
+	 * the entry from the cache.
+	 *
+	 * @param jobId The job id of the {@link JobResult}.
+	 * @return A {@link SoftReference} to the {@link JobResult} for the job, or {@code null} if the
+	 * entry cannot be found in the cache.
+	 */
+	@Nullable
+	public SoftReference<JobResult> get(final JobID jobId) {
+		final SoftReference<JobResult> jobResultRef = jobExecutionResultCache.getIfPresent(jobId);
+		jobExecutionResultCache.invalidate(jobId);
+		return jobResultRef;
+	}
+
+	private void assertJobExecutionResultNotCached(final JobID jobId) {
+		checkState(
+			jobExecutionResultCache.getIfPresent(jobId) == null,
+			"jobExecutionResultCache already contained entry for job %s", jobId);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 10ec659..21a7ba4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.entrypoint;
 
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
@@ -34,6 +33,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRestEndpoint;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
@@ -60,6 +60,8 @@ import javax.annotation.Nullable;
 
 import java.util.concurrent.Executor;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Base class for per-job cluster entry points.
  */
@@ -287,15 +289,17 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 		}
 
 		@Override
-		public void jobFinished(JobExecutionResult result) {
+		public void jobFinished(JobResult result) {
 			LOG.info("Job({}) finished.", jobId);
 
 			shutDownAndTerminate(true);
 		}
 
 		@Override
-		public void jobFailed(Throwable cause) {
-			LOG.info("Job({}) failed.", jobId, cause);
+		public void jobFailed(JobResult result) {
+			checkArgument(result.getSerializedThrowable().isPresent());
+
+			LOG.info("Job({}) failed.", jobId, result.getSerializedThrowable().get().getMessage());
 
 			shutDownAndTerminate(false);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
index 17167f2..149ea0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.runtime.jobmaster.JobResult;
 
 /**
  * Interface for completion actions once a Flink job has reached
@@ -31,14 +31,14 @@ public interface OnCompletionActions {
 	 *
 	 * @param result of the job execution
 	 */
-	void jobFinished(JobExecutionResult result);
+	void jobFinished(JobResult result);
 
 	/**
-	 * Job failed with the given exception.
+	 * Job failed with an exception.
 	 *
-	 * @param cause of the job failure
+	 * @param result The result of the job carrying the failure cause.
 	 */
-	void jobFailed(Throwable cause);
+	void jobFailed(JobResult result);
 
 	/**
 	 * Job was finished by another JobMaster.

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index e699d6d..4833cbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -252,7 +251,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 	 * Job completion notification triggered by JobManager.
 	 */
 	@Override
-	public void jobFinished(JobExecutionResult result) {
+	public void jobFinished(JobResult result) {
 		try {
 			unregisterJobFromHighAvailability();
 			shutdownInternally();
@@ -268,14 +267,14 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 	 * Job completion notification triggered by JobManager.
 	 */
 	@Override
-	public void jobFailed(Throwable cause) {
+	public void jobFailed(JobResult result) {
 		try {
 			unregisterJobFromHighAvailability();
 			shutdownInternally();
 		}
 		finally {
 			if (toNotifyOnComplete != null) {
-				toNotifyOnComplete.jobFailed(cause);
+				toNotifyOnComplete.jobFailed(result);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index b81a8c8..1de8e19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
@@ -101,6 +100,7 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 
@@ -970,6 +970,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 		final JobID jobID = executionGraph.getJobID();
 		final String jobName = executionGraph.getJobName();
+		final JobResult.Builder builder = new JobResult.Builder()
+			.jobId(jobID)
+			.netRuntime(0);
 
 		if (newJobStatus.isGloballyTerminalState()) {
 			switch (newJobStatus) {
@@ -977,10 +980,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 					try {
 						// TODO get correct job duration
 						// job done, let's get the accumulators
-						Map<String, Object> accumulatorResults = executionGraph.getAccumulators();
-						JobExecutionResult result = new JobExecutionResult(jobID, 0L, accumulatorResults);
-
-						executor.execute(() -> jobCompletionActions.jobFinished(result));
+						final Map<String, SerializedValue<Object>> accumulatorsSerialized = executionGraph.getAccumulatorsSerialized();
+						builder.accumulatorResults(accumulatorsSerialized);
+						executor.execute(() -> jobCompletionActions.jobFinished(builder.build()));
 					}
 					catch (Exception e) {
 						log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
@@ -990,7 +992,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 								"The job is registered as 'FINISHED (successful), but this notification describes " +
 								"a failure, since the resulting accumulators could not be fetched.", e);
 
-						executor.execute(() ->jobCompletionActions.jobFailed(exception));
+						executor.execute(() -> jobCompletionActions.jobFailed(builder
+							.serializedThrowable(new SerializedThrowable(exception))
+							.build()));
 					}
 					break;
 
@@ -998,7 +1002,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 					final JobExecutionException exception = new JobExecutionException(
 						jobID, "Job was cancelled.", new Exception("The job was cancelled"));
 
-					executor.execute(() -> jobCompletionActions.jobFailed(exception));
+					executor.execute(() -> jobCompletionActions.jobFailed(builder
+						.serializedThrowable(new SerializedThrowable(exception))
+						.build()));
 					break;
 				}
 
@@ -1006,7 +1012,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 					final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
 					final JobExecutionException exception = new JobExecutionException(
 							jobID, "Job execution failed.", unpackedError);
-					executor.execute(() -> jobCompletionActions.jobFailed(exception));
+					executor.execute(() -> jobCompletionActions.jobFailed(builder
+						.serializedThrowable(new SerializedThrowable(exception))
+						.build()));
 					break;
 				}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
new file mode 100644
index 0000000..4a409d5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but with an optional
+ * {@link SerializedThrowable} when the job failed.
+ *
+ * <p>This is used by the {@link JobMaster} to send the results to the {@link Dispatcher}.
+ */
+public class JobResult implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private final JobID jobId;
+
+	private final Map<String, SerializedValue<Object>> accumulatorResults;
+
+	private final long netRuntime;
+
+	/** Stores the cause of the job failure, or {@code null} if the job finished successfully. */
+	@Nullable
+	private final SerializedThrowable serializedThrowable;
+
+	private JobResult(
+			final JobID jobId,
+			final Map<String, SerializedValue<Object>> accumulatorResults,
+			final long netRuntime,
+			@Nullable final SerializedThrowable serializedThrowable) {
+
+		checkArgument(netRuntime >= 0, "netRuntime must be greater than or equals 0");
+
+		this.jobId = requireNonNull(jobId);
+		this.accumulatorResults = requireNonNull(accumulatorResults);
+		this.netRuntime = netRuntime;
+		this.serializedThrowable = serializedThrowable;
+	}
+
+	/**
+	 * Returns {@code true} if the job finished successfully.
+	 */
+	public boolean isSuccess() {
+		return serializedThrowable == null;
+	}
+
+	public JobID getJobId() {
+		return jobId;
+	}
+
+	public Map<String, SerializedValue<Object>> getAccumulatorResults() {
+		return accumulatorResults;
+	}
+
+	public long getNetRuntime() {
+		return netRuntime;
+	}
+
+	/**
+	 * Returns an empty {@code Optional} if the job finished successfully, otherwise the
+	 * {@code Optional} will carry the failure cause.
+	 */
+	public Optional<SerializedThrowable> getSerializedThrowable() {
+		return Optional.ofNullable(serializedThrowable);
+	}
+
+	/**
+	 * Builder for {@link JobResult}.
+	 */
+	@Internal
+	public static class Builder {
+
+		private JobID jobId;
+
+		private Map<String, SerializedValue<Object>> accumulatorResults;
+
+		private long netRuntime = -1;
+
+		private SerializedThrowable serializedThrowable;
+
+		public Builder jobId(final JobID jobId) {
+			this.jobId = jobId;
+			return this;
+		}
+
+		public Builder accumulatorResults(final Map<String, SerializedValue<Object>> accumulatorResults) {
+			this.accumulatorResults = accumulatorResults;
+			return this;
+		}
+
+		public Builder netRuntime(final long netRuntime) {
+			this.netRuntime = netRuntime;
+			return this;
+		}
+
+		public Builder serializedThrowable(final SerializedThrowable serializedThrowable) {
+			this.serializedThrowable = serializedThrowable;
+			return this;
+		}
+
+		public JobResult build() {
+			return new JobResult(
+				jobId,
+				accumulatorResults == null ? Collections.emptyMap() : accumulatorResults,
+				netRuntime,
+				serializedThrowable);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultGoneException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultGoneException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultGoneException.java
new file mode 100644
index 0000000..d73b3a5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultGoneException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception indicating that the required {@link org.apache.flink.runtime.jobmaster.JobResult} was
+ * garbage collected.
+ * @see org.apache.flink.runtime.dispatcher.JobExecutionResultCache
+ */
+public class JobExecutionResultGoneException extends FlinkException {
+
+	private static final long serialVersionUID = 1L;
+
+	public JobExecutionResultGoneException(JobID jobId) {
+		super(String.format("Job execution result for job [%s] is gone.", jobId));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 3a1474d..b9d76da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.minicluster;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -39,6 +41,7 @@ import org.apache.flink.util.FlinkException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -358,12 +361,12 @@ public class MiniClusterJobDispatcher {
 		}
 
 		@Override
-		public void jobFinished(JobExecutionResult result) {
+		public void jobFinished(JobResult result) {
 			decrementCheckAndCleanup();
 		}
 
 		@Override
-		public void jobFailed(Throwable cause) {
+		public void jobFailed(JobResult result) {
 			decrementCheckAndCleanup();
 		}
 
@@ -401,11 +404,9 @@ public class MiniClusterJobDispatcher {
 
 		private final CountDownLatch jobMastersToWaitFor;
 
-		private volatile Throwable jobException;
-
 		private volatile Throwable runnerException;
 
-		private volatile JobExecutionResult result;
+		private volatile JobResult result;
 		
 		BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
 			this.jobId = jobId;
@@ -413,14 +414,16 @@ public class MiniClusterJobDispatcher {
 		}
 
 		@Override
-		public void jobFinished(JobExecutionResult jobResult) {
-			this.result = jobResult;
+		public void jobFinished(JobResult result) {
+			this.result = result;
 			jobMastersToWaitFor.countDown();
 		}
 
 		@Override
-		public void jobFailed(Throwable cause) {
-			jobException = cause;
+		public void jobFailed(JobResult result) {
+			checkArgument(result.getSerializedThrowable().isPresent());
+
+			this.result = result;
 			jobMastersToWaitFor.countDown();
 		}
 
@@ -439,9 +442,8 @@ public class MiniClusterJobDispatcher {
 		public JobExecutionResult getResult() throws JobExecutionException, InterruptedException {
 			jobMastersToWaitFor.await();
 
-			final Throwable jobFailureCause = this.jobException;
 			final Throwable runnerException = this.runnerException;
-			final JobExecutionResult result = this.result;
+			final JobResult result = this.result;
 
 			// (1) we check if the job terminated with an exception
 			// (2) we check whether the job completed successfully
@@ -449,7 +451,11 @@ public class MiniClusterJobDispatcher {
 			//     completed successfully in that case, if multiple JobMasters were running
 			//     and other took over. only if all encounter a fatal error, the job cannot finish
 
-			if (jobFailureCause != null) {
+			if (result != null && !result.isSuccess()) {
+				checkState(result.getSerializedThrowable().isPresent());
+				final Throwable jobFailureCause = result.getSerializedThrowable()
+					.get()
+					.deserializeError(ClassLoader.getSystemClassLoader());
 				if (jobFailureCause instanceof JobExecutionException) {
 					throw (JobExecutionException) jobFailureCause;
 				}
@@ -458,7 +464,16 @@ public class MiniClusterJobDispatcher {
 				}
 			}
 			else if (result != null) {
-				return result;
+				try {
+					return new JobExecutionResult(
+						jobId,
+						result.getNetRuntime(),
+						AccumulatorHelper.deserializeAccumulators(
+							result.getAccumulatorResults(),
+							ClassLoader.getSystemClassLoader()));
+				} catch (final IOException | ClassNotFoundException e) {
+					throw new JobExecutionException(result.getJobId(), e);
+				}
 			}
 			else if (runnerException != null) {
 				throw new JobExecutionException(jobId,

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 61dca3b..0f26d3b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -23,7 +23,9 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.JobExecutionResultGoneException;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
@@ -92,4 +94,47 @@ public interface RestfulGateway extends RpcGateway {
 	 * @return Future containing the collection of instance ids and the corresponding metric query service path
 	 */
 	CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
+
+	/**
+	 * Returns the JobExecutionResult for a job, or in case the job failed, the failure cause.
+	 *
+	 * @param jobId ID of the job that we are interested in.
+	 * @param timeout Timeout for the asynchronous operation.
+	 *
+	 * @see #isJobExecutionResultPresent(JobID, Time)
+	 *
+	 * @return CompletableFuture containing the JobExecutionResult. The future is completed
+	 * exceptionally with:
+	 * <ul>
+	 * 	<li>{@link FlinkJobNotFoundException} if there is no result, or if the result has
+	 * 	expired
+	 * 	<li>{@link JobExecutionResultGoneException} if the result was removed due to memory demand.
+	 * </ul>
+	 */
+	default CompletableFuture<JobResult> getJobExecutionResult(
+			JobID jobId,
+			@RpcTimeout Time timeout) {
+		throw new UnsupportedOperationException();
+	}
+
+	/**
+	 * Tests if the {@link JobResult} is present.
+	 *
+	 * @param jobId ID of the job that we are interested in.
+	 * @param timeout Timeout for the asynchronous operation.
+	 *
+	 * @see #getJobExecutionResult(JobID, Time)
+	 *
+	 * @return {@link CompletableFuture} containing {@code true} when then the
+	 * {@link JobResult} is present. The future is completed exceptionally with:
+	 * <ul>
+	 * 	<li>{@link FlinkJobNotFoundException} if there is no job running with the specified ID, or
+	 * 	if the result has expired
+	 * 	<li>{@link JobExecutionResultGoneException} if the result was removed due to memory demand.
+	 * </ul>
+	 */
+	default CompletableFuture<Boolean> isJobExecutionResultPresent(
+			JobID jobId, @RpcTimeout Time timeout) {
+		throw new UnsupportedOperationException();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index b75ae06..b5fcd18 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -37,9 +37,11 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -51,6 +53,8 @@ import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -67,11 +71,14 @@ import org.mockito.Mockito;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
@@ -249,6 +256,66 @@ public class DispatcherTest extends TestLogger {
 		assertThat(dispatcherGateway.listJobs(TIMEOUT).get(), hasSize(1));
 	}
 
+	/**
+	 * Test that {@link JobResult} is cached when the job finishes.
+	 */
+	@Test
+	public void testCacheJobExecutionResult() throws Exception {
+		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
+		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+		OnCompletionActions onCompletionActions;
+
+		final JobID failedJobId = new JobID();
+		onCompletionActions = dispatcher.new DispatcherOnCompleteActions(failedJobId);
+
+		onCompletionActions.jobFailed(new JobResult.Builder()
+			.jobId(failedJobId)
+			.serializedThrowable(new SerializedThrowable(new RuntimeException("expected")))
+			.netRuntime(Long.MAX_VALUE)
+			.build());
+
+		assertThat(
+			dispatcherGateway.isJobExecutionResultPresent(failedJobId, TIMEOUT).get(),
+			equalTo(true));
+		assertThat(
+			dispatcherGateway.getJobExecutionResult(failedJobId, TIMEOUT)
+				.get()
+				.isSuccess(),
+			equalTo(false));
+
+		final JobID successJobId = new JobID();
+		onCompletionActions = dispatcher.new DispatcherOnCompleteActions(successJobId);
+
+		onCompletionActions.jobFinished(new JobResult.Builder()
+			.jobId(successJobId)
+			.netRuntime(Long.MAX_VALUE)
+			.build());
+
+		assertThat(
+			dispatcherGateway.isJobExecutionResultPresent(successJobId, TIMEOUT).get(),
+			equalTo(true));
+		assertThat(
+			dispatcherGateway.getJobExecutionResult(successJobId, TIMEOUT)
+				.get()
+				.isSuccess(),
+			equalTo(true));
+	}
+
+	@Test
+	public void testThrowExceptionIfJobExecutionResultNotFound() throws Exception {
+		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
+		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+		try {
+			dispatcherGateway.getJobExecutionResult(new JobID(), TIMEOUT).get();
+		} catch (ExecutionException e) {
+			final Throwable throwable = ExceptionUtils.stripExecutionException(e);
+			assertThat(throwable, instanceOf(FlinkJobNotFoundException.class));
+		}
+	}
+
 	private static class TestingDispatcher extends Dispatcher {
 
 		private final JobID expectedJobId;

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCacheTest.java
new file mode 100644
index 0000000..dfc059c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCacheTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.lang.ref.SoftReference;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link JobExecutionResultCache}.
+ */
+@Category(Flip6.class)
+public class JobExecutionResultCacheTest extends TestLogger {
+
+	private JobExecutionResultCache jobExecutionResultCache;
+
+	@Before
+	public void setUp() {
+		jobExecutionResultCache = new JobExecutionResultCache();
+	}
+
+	@Test
+	public void testCacheResultUntilRetrieved() {
+		final JobID jobId = new JobID();
+		final JobResult jobResult = new JobResult.Builder()
+			.jobId(jobId)
+			.netRuntime(Long.MAX_VALUE)
+			.build();
+		jobExecutionResultCache.put(jobResult);
+
+		assertThat(jobExecutionResultCache.contains(jobId), equalTo(true));
+
+		SoftReference<JobResult> jobResultRef;
+		jobResultRef = jobExecutionResultCache.get(jobId);
+
+		assertThat(jobResultRef, notNullValue());
+		assertThat(jobResultRef.get(), sameInstance(jobResult));
+
+		assertThat(jobExecutionResultCache.contains(jobId), equalTo(false));
+
+		jobResultRef = jobExecutionResultCache.get(jobId);
+		assertThat(jobResultRef, nullValue());
+	}
+
+	@Test
+	public void testThrowExceptionIfEntryAlreadyExists() {
+		final JobID jobId = new JobID();
+		final JobResult build = new JobResult.Builder()
+			.jobId(jobId)
+			.netRuntime(Long.MAX_VALUE)
+			.build();
+		jobExecutionResultCache.put(build);
+
+		try {
+			jobExecutionResultCache.put(build);
+			fail("Expected exception not thrown.");
+		} catch (final IllegalStateException e) {
+			assertThat(e.getMessage(), containsString("already contained entry for job"));
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index a0959c0..245ea27 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
@@ -33,12 +32,14 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -101,7 +102,7 @@ public class JobManagerRunnerMockTest extends TestLogger {
 		SubmittedJobGraphStore submittedJobGraphStore = mock(SubmittedJobGraphStore.class);
 
 		blobStore = mock(BlobStore.class);
-		
+
 		HighAvailabilityServices haServices = mock(HighAvailabilityServices.class);
 		when(haServices.getJobManagerLeaderElectionService(any(JobID.class))).thenReturn(leaderElectionService);
 		when(haServices.getSubmittedJobGraphStore()).thenReturn(submittedJobGraphStore);
@@ -138,7 +139,7 @@ public class JobManagerRunnerMockTest extends TestLogger {
 		assertTrue(!jobCompletion.isJobFailed());
 
 		verify(jobManager).start(any(JobMasterId.class), any(Time.class));
-		
+
 		runner.shutdown();
 		verify(leaderElectionService).stop();
 		verify(jobManager).shutDown();
@@ -175,7 +176,7 @@ public class JobManagerRunnerMockTest extends TestLogger {
 		assertTrue(!jobCompletion.isJobFinished());
 
 		// runner been told by JobManager that job is finished
-		runner.jobFinished(mock(JobExecutionResult.class));
+		runner.jobFinished(mock(JobResult.class));
 
 		assertTrue(jobCompletion.isJobFinished());
 		assertFalse(jobCompletion.isJobFinishedByOther());
@@ -195,7 +196,10 @@ public class JobManagerRunnerMockTest extends TestLogger {
 		assertTrue(!jobCompletion.isJobFinished());
 
 		// runner been told by JobManager that job is failed
-		runner.jobFailed(new Exception("failed manually"));
+		runner.jobFailed(new JobResult.Builder()
+			.jobId(new JobID())
+			.serializedThrowable(new SerializedThrowable(new Exception("failed manually")))
+			.build());
 
 		assertTrue(jobCompletion.isJobFailed());
 		verify(leaderElectionService).stop();
@@ -239,14 +243,14 @@ public class JobManagerRunnerMockTest extends TestLogger {
 
 	private static class TestingOnCompletionActions implements OnCompletionActions, FatalErrorHandler {
 
-		private volatile JobExecutionResult result;
+		private volatile JobResult result;
 
 		private volatile Throwable failedCause;
 
 		private volatile boolean finishedByOther;
 
 		@Override
-		public void jobFinished(JobExecutionResult result) {
+		public void jobFinished(JobResult result) {
 			checkArgument(!isJobFinished(), "job finished already");
 			checkArgument(!isJobFailed(), "job failed already");
 
@@ -254,11 +258,11 @@ public class JobManagerRunnerMockTest extends TestLogger {
 		}
 
 		@Override
-		public void jobFailed(Throwable cause) {
+		public void jobFailed(JobResult result) {
 			checkArgument(!isJobFinished(), "job finished already");
 			checkArgument(!isJobFailed(), "job failed already");
 
-			this.failedCause = cause;
+			this.failedCause = result.getSerializedThrowable().get();
 		}
 
 		@Override
@@ -271,7 +275,10 @@ public class JobManagerRunnerMockTest extends TestLogger {
 
 		@Override
 		public void onFatalError(Throwable exception) {
-			jobFailed(exception);
+			checkArgument(!isJobFinished(), "job finished already");
+			checkArgument(!isJobFailed(), "job failed already");
+
+			this.failedCause = exception;
 		}
 
 		boolean isJobFinished() {

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index e4f9fc2..d77a1d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -266,12 +265,12 @@ public class JobMasterTest extends TestLogger {
 	private static final class NoOpOnCompletionActions implements OnCompletionActions {
 
 		@Override
-		public void jobFinished(JobExecutionResult result) {
+		public void jobFinished(final JobResult result) {
 
 		}
 
 		@Override
-		public void jobFailed(Throwable cause) {
+		public void jobFailed(final JobResult result) {
 
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
new file mode 100644
index 0000000..1c7f0dd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link JobResult}.
+ */
+@Category(Flip6.class)
+public class JobResultTest extends TestLogger {
+
+	@Test
+	public void testNetRuntimeMandatory() {
+		try {
+			new JobResult.Builder()
+				.jobId(new JobID())
+				.build();
+			fail("Expected exception not thrown");
+		} catch (final IllegalArgumentException e) {
+			assertThat(e.getMessage(), equalTo("netRuntime must be greater than or equals 0"));
+		}
+	}
+
+	@Test
+	public void testIsNotSuccess() throws Exception {
+		final JobResult jobResult = new JobResult.Builder()
+			.jobId(new JobID())
+			.serializedThrowable(new SerializedThrowable(new RuntimeException()))
+			.netRuntime(Long.MAX_VALUE)
+			.build();
+
+		assertThat(jobResult.isSuccess(), equalTo(false));
+	}
+
+	@Test
+	public void testIsSuccess() throws Exception {
+		final JobResult jobResult = new JobResult.Builder()
+			.jobId(new JobID())
+			.netRuntime(Long.MAX_VALUE)
+			.build();
+
+		assertThat(jobResult.isSuccess(), equalTo(true));
+	}
+
+}


[4/4] flink git commit: [FLINK-7949] Add unit test for AsyncWaitOperator recovery with full queue

Posted by tr...@apache.org.
[FLINK-7949] Add unit test for AsyncWaitOperator recovery with full queue


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ee4d06a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ee4d06a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ee4d06a

Branch: refs/heads/master
Commit: 2ee4d06ac879e955df0648dd0988f081a617e077
Parents: 6593e0f
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jan 10 18:53:38 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 11 12:12:51 2018 +0100

----------------------------------------------------------------------
 .../operators/async/AsyncWaitOperatorTest.java  | 123 ++++++++++++++++++-
 1 file changed, 122 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2ee4d06a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index f741053..42e1197 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -56,6 +57,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.AcknowledgeStreamMockEnvironment;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -65,6 +67,7 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -74,10 +77,12 @@ import org.mockito.stubbing.Answer;
 import javax.annotation.Nonnull;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -88,6 +93,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
@@ -977,6 +983,122 @@ public class AsyncWaitOperatorTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that the AysncWaitOperator can restart if checkpointed queue was full.
+	 *
+	 * <p>See FLINK-7949
+	 */
+	@Test(timeout = 10000)
+	public void testRestartWithFullQueue() throws Exception {
+		int capacity = 10;
+
+		// 1. create the snapshot which contains capacity + 1 elements
+		final CompletableFuture<Void> trigger = new CompletableFuture<>();
+		final ControllableAsyncFunction<Integer> controllableAsyncFunction = new ControllableAsyncFunction<>(trigger);
+
+		final OneInputStreamOperatorTestHarness<Integer, Integer> snapshotHarness = new OneInputStreamOperatorTestHarness<>(
+			new AsyncWaitOperator<>(
+				controllableAsyncFunction, // the NoOpAsyncFunction is like a blocking function
+				1000L,
+				capacity,
+				AsyncDataStream.OutputMode.ORDERED),
+			IntSerializer.INSTANCE);
+
+		snapshotHarness.open();
+
+		final OperatorStateHandles snapshot;
+
+		final ArrayList<Integer> expectedOutput = new ArrayList<>(capacity + 1);
+
+		try {
+			synchronized (snapshotHarness.getCheckpointLock()) {
+				for (int i = 0; i < capacity; i++) {
+					snapshotHarness.processElement(i, 0L);
+					expectedOutput.add(i);
+				}
+			}
+
+			expectedOutput.add(capacity);
+
+			final OneShotLatch lastElement = new OneShotLatch();
+
+			final CheckedThread lastElementWriter = new CheckedThread() {
+				@Override
+				public void go() throws Exception {
+					synchronized (snapshotHarness.getCheckpointLock()) {
+						lastElement.trigger();
+						snapshotHarness.processElement(capacity, 0L);
+					}
+				}
+			};
+
+			lastElementWriter.start();
+
+			lastElement.await();
+
+			synchronized (snapshotHarness.getCheckpointLock()) {
+				// execute the snapshot within the checkpoint lock, because then it is guaranteed
+				// that the lastElementWriter has written the exceeding element
+				snapshot = snapshotHarness.snapshot(0L, 0L);
+			}
+
+			// trigger the computation to make the close call finish
+			trigger.complete(null);
+		} finally {
+			synchronized (snapshotHarness.getCheckpointLock()) {
+				snapshotHarness.close();
+			}
+		}
+
+		// 2. restore the snapshot and check that we complete
+		final OneInputStreamOperatorTestHarness<Integer, Integer> recoverHarness = new OneInputStreamOperatorTestHarness<>(
+			new AsyncWaitOperator<>(
+				new ControllableAsyncFunction<>(CompletableFuture.completedFuture(null)),
+				1000L,
+				capacity,
+				AsyncDataStream.OutputMode.ORDERED),
+			IntSerializer.INSTANCE);
+
+		recoverHarness.initializeState(snapshot);
+
+		synchronized (recoverHarness.getCheckpointLock()) {
+			recoverHarness.open();
+		}
+
+		synchronized (recoverHarness.getCheckpointLock()) {
+			recoverHarness.close();
+		}
+
+		final ConcurrentLinkedQueue<Object> output = recoverHarness.getOutput();
+
+		assertThat(output.size(), Matchers.equalTo(capacity + 1));
+
+		final ArrayList<Integer> outputElements = new ArrayList<>(capacity + 1);
+
+		for (int i = 0; i < capacity + 1; i++) {
+			StreamRecord<Integer> streamRecord = ((StreamRecord<Integer>) output.poll());
+			outputElements.add(streamRecord.getValue());
+		}
+
+		assertThat(outputElements, Matchers.equalTo(expectedOutput));
+	}
+
+	private static class ControllableAsyncFunction<IN> implements AsyncFunction<IN, IN> {
+
+		private static final long serialVersionUID = -4214078239267288636L;
+
+		private transient CompletableFuture<Void> trigger;
+
+		private ControllableAsyncFunction(CompletableFuture<Void> trigger) {
+			this.trigger = Preconditions.checkNotNull(trigger);
+		}
+
+		@Override
+		public void asyncInvoke(IN input, ResultFuture<IN> resultFuture) throws Exception {
+			trigger.thenAccept(v -> resultFuture.complete(Collections.singleton(input)));
+		}
+	}
+
 	private static class NoOpAsyncFunction<IN, OUT> implements AsyncFunction<IN, OUT> {
 		private static final long serialVersionUID = -3060481953330480694L;
 
@@ -985,5 +1107,4 @@ public class AsyncWaitOperatorTest extends TestLogger {
 			// no op
 		}
 	}
-
 }


[3/4] flink git commit: [FLINK-7949] Make AsyncWaitOperator recoverable also when queue is full

Posted by tr...@apache.org.
[FLINK-7949] Make AsyncWaitOperator recoverable also when queue is full

Start emitter thread BEFORE filling up the queue of recovered elements.
This guarantees that we won't deadlock inserting the recovered elements,
because the emitter can already start processing elements.

This closes #4924.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6593e0f9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6593e0f9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6593e0f9

Branch: refs/heads/master
Commit: 6593e0f9e6db9d862afadeeae79e0afa2590fd73
Parents: 08e550c
Author: Bartłomiej Tartanus <ba...@gmail.com>
Authored: Mon Oct 30 15:39:43 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 11 12:12:51 2018 +0100

----------------------------------------------------------------------
 .../api/operators/async/AsyncWaitOperator.java      | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6593e0f9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index aec20c0..a7b9438 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -163,6 +163,14 @@ public class AsyncWaitOperator<IN, OUT>
 	public void open() throws Exception {
 		super.open();
 
+		// create the emitter
+		this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
+
+		// start the emitter thread
+		this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
+		emitterThread.setDaemon(true);
+		emitterThread.start();
+
 		// process stream elements from state, since the Emit thread will start as soon as all
 		// elements from previous state are in the StreamElementQueue, we have to make sure that the
 		// order to open all operators in the operator chain proceeds from the tail operator to the
@@ -186,14 +194,6 @@ public class AsyncWaitOperator<IN, OUT>
 			recoveredStreamElements = null;
 		}
 
-		// create the emitter
-		this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
-
-		// start the emitter thread
-		this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
-		emitterThread.setDaemon(true);
-		emitterThread.start();
-
 	}
 
 	@Override


[2/4] flink git commit: [FLINK-8404] [tests] Mark Flip-6 tests with Flip6 category annotation

Posted by tr...@apache.org.
[FLINK-8404] [tests] Mark Flip-6 tests with Flip6 category annotation

Marks all existing Flip-6 test cases with the Flip6 category annotation. That
way they are only run if the Flip-6 test profile is active.

This closes #5278.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8353123b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8353123b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8353123b

Branch: refs/heads/master
Commit: 8353123b39ea49cc5b3bd9d324972bf6ae02f2a6
Parents: 6c078c0
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jan 10 14:13:38 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jan 11 12:12:50 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/slotpool/AllocatedSlotsTest.java  | 3 +++
 .../flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java  | 3 +++
 .../flink/runtime/jobmaster/slotpool/DualKeyMapTest.java      | 3 +++
 .../flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java     | 3 +++
 .../jobmaster/slotpool/SlotPoolSchedulingTestBase.java        | 3 +++
 .../apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java | 3 +++
 .../runtime/jobmaster/slotpool/SlotSharingManagerTest.java    | 3 +++
 .../flink/runtime/resourcemanager/ResourceManagerTest.java    | 3 +++
 .../org/apache/flink/runtime/rest/RestServerEndpointTest.java | 3 +++
 .../rest/handler/job/metrics/AbstractMetricsHandlerTest.java  | 3 +++
 .../rest/handler/job/metrics/MetricsHandlerTestBase.java      | 6 ++++--
 .../rest/messages/job/metrics/AbstractMetricsHeadersTest.java | 3 +++
 .../messages/job/metrics/JobManagerMetricsHeadersTest.java    | 7 ++++++-
 .../rest/messages/job/metrics/JobMetricsHeadersTest.java      | 6 +++++-
 .../messages/job/metrics/JobVertexMetricsHeadersTest.java     | 6 +++++-
 .../rest/messages/job/metrics/MetricsFilterParameterTest.java | 3 +++
 .../rest/messages/job/metrics/SubtaskMetricsHeadersTest.java  | 6 +++++-
 .../messages/job/metrics/TaskManagerMetricsHeadersTest.java   | 6 +++++-
 .../messages/taskmanager/TaskManagerIdPathParameterTest.java  | 6 +++++-
 .../runtime/taskexecutor/NetworkBufferCalculationTest.java    | 3 +++
 .../runtime/taskexecutor/TestingTaskExecutorGateway.java      | 4 ++++
 21 files changed, 78 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotsTest.java
index 4dee924..f97ff4f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AllocatedSlotsTest.java
@@ -25,15 +25,18 @@ import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGate
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+@Category(Flip6.class)
 public class AllocatedSlotsTest extends TestLogger {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
index 4835c57..2d18c65 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AvailableSlotsTest.java
@@ -24,9 +24,11 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -35,6 +37,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+@Category(Flip6.class)
 public class AvailableSlotsTest extends TestLogger {
 
 	static final ResourceProfile DEFAULT_TESTING_PROFILE = new ResourceProfile(1.0, 512);

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMapTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMapTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMapTest.java
index 1500d24..4504b5a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMapTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMapTest.java
@@ -19,10 +19,12 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.Matchers;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.HashSet;
 import java.util.Random;
@@ -34,6 +36,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 /**
  * Tests for the {@link DualKeyMap}.
  */
+@Category(Flip6.class)
 public class DualKeyMapTest extends TestLogger {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
index 2d862c5..e60a3d6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.clock.Clock;
 import org.apache.flink.runtime.util.clock.SystemClock;
+import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
@@ -54,6 +55,7 @@ import akka.pattern.AskTimeoutException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import javax.annotation.Nullable;
 
@@ -71,6 +73,7 @@ import static org.junit.Assert.fail;
 /**
  * Tests for the SlotPool using a proper RPC setup.
  */
+@Category(Flip6.class)
 public class SlotPoolRpcTest extends TestLogger {
 
 	private static RpcService rpcService;

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
index 31be1ae..4cd7782 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java
@@ -24,12 +24,14 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
 
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
@@ -37,6 +39,7 @@ import java.util.concurrent.TimeoutException;
 /**
  * Test base for {@link SlotPool} related scheduling test cases.
  */
+@Category(Flip6.class)
 public class SlotPoolSchedulingTestBase extends TestLogger {
 
 	private static final JobID jobId = new JobID();

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index 6a8ef0a..ef79be1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -48,6 +49,7 @@ import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,6 +72,7 @@ import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@Category(Flip6.class)
 public class SlotPoolTest extends TestLogger {
 
 	private static final Logger LOG = LoggerFactory.getLogger(SlotPoolTest.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
index c355f38..c18b3b3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
@@ -28,11 +28,13 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotContext;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.Collections;
 import java.util.Objects;
@@ -49,6 +51,7 @@ import static org.junit.Assert.assertTrue;
 /**
  * Test cases for the {@link SlotSharingManager}.
  */
+@Category(Flip6.class)
 public class SlotSharingManagerTest extends TestLogger {
 
 	private static final SlotSharingGroupId SLOT_SHARING_GROUP_ID = new SlotSharingGroupId();

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 87cb4a9..8f35b13 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -35,16 +35,19 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
+@Category(Flip6.class)
 public class ResourceManagerTest extends TestLogger {
 
 	private TestingRpcService rpcService;

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointTest.java
index 3cb4a55..866dfbf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointTest.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.rest;
 
+import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -31,6 +33,7 @@ import static org.junit.Assert.assertEquals;
 /**
  * Test cases for the {@link RestServerEndpoint}.
  */
+@Category(Flip6.class)
 public class RestServerEndpointTest extends TestLogger {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandlerTest.java
index 241bbba..80a8759 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractMetricsHandlerTest.java
@@ -35,10 +35,12 @@ import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
 import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
 import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
@@ -59,6 +61,7 @@ import static org.mockito.Mockito.when;
 /**
  * Tests for {@link AbstractMetricsHandler}.
  */
+@Category(Flip6.class)
 public class AbstractMetricsHandlerTest extends TestLogger {
 
 	private static final String TEST_METRIC_NAME = "test_counter";

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java
index 05a1163..92ee185 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/MetricsHandlerTestBase.java
@@ -29,10 +29,12 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
 import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
@@ -49,8 +51,8 @@ import static org.mockito.Mockito.when;
 /**
  * Unit test base class for subclasses of {@link AbstractMetricsHandler}.
  */
-public abstract class MetricsHandlerTestBase<T extends
-	AbstractMetricsHandler> extends TestLogger {
+@Category(Flip6.class)
+public abstract class MetricsHandlerTestBase<T extends AbstractMetricsHandler> extends TestLogger {
 
 	private static final String TEST_METRIC_NAME = "test_counter";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeadersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeadersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeadersTest.java
index 51a6ff8..5228e0e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeadersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/AbstractMetricsHeadersTest.java
@@ -21,12 +21,14 @@ package org.apache.flink.runtime.rest.messages.job.metrics;
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
@@ -34,6 +36,7 @@ import static org.junit.Assert.assertThat;
 /**
  * Tests for {@link AbstractMetricsHeaders}.
  */
+@Category(Flip6.class)
 public class AbstractMetricsHeadersTest extends TestLogger {
 
 	private AbstractMetricsHeaders<EmptyMessageParameters> metricsHandlerHeaders;

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsHeadersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsHeadersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsHeadersTest.java
index f6c9171..e87d3de 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsHeadersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobManagerMetricsHeadersTest.java
@@ -18,7 +18,11 @@
 
 package org.apache.flink.runtime.rest.messages.job.metrics;
 
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
@@ -27,7 +31,8 @@ import static org.junit.Assert.assertThat;
 /**
  * Tests for {@link JobManagerMetricsHeaders}.
  */
-public class JobManagerMetricsHeadersTest {
+@Category(Flip6.class)
+public class JobManagerMetricsHeadersTest extends TestLogger {
 
 	private final JobManagerMetricsHeaders jobManagerMetricsHeaders =
 		JobManagerMetricsHeaders.getInstance();

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsHeadersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsHeadersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsHeadersTest.java
index a623eba..515c7c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsHeadersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobMetricsHeadersTest.java
@@ -19,8 +19,11 @@
 package org.apache.flink.runtime.rest.messages.job.metrics;
 
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
@@ -29,7 +32,8 @@ import static org.junit.Assert.assertThat;
 /**
  * Tests for {@link JobMetricsHeaders}.
  */
-public class JobMetricsHeadersTest {
+@Category(Flip6.class)
+public class JobMetricsHeadersTest extends TestLogger {
 
 	private final JobMetricsHeaders jobMetricsHeaders = JobMetricsHeaders.getInstance();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsHeadersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsHeadersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsHeadersTest.java
index fd8283a..f20abdb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsHeadersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/JobVertexMetricsHeadersTest.java
@@ -20,8 +20,11 @@ package org.apache.flink.runtime.rest.messages.job.metrics;
 
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
@@ -30,7 +33,8 @@ import static org.junit.Assert.assertThat;
 /**
  * Tests for {@link JobVertexMetricsHeaders}.
  */
-public class JobVertexMetricsHeadersTest {
+@Category(Flip6.class)
+public class JobVertexMetricsHeadersTest extends TestLogger {
 
 	private final JobVertexMetricsHeaders jobVertexMetricsHeaders = JobVertexMetricsHeaders
 		.getInstance();

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameterTest.java
index 2756a65..7e1812f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/MetricsFilterParameterTest.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.runtime.rest.messages.job.metrics;
 
+import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertFalse;
@@ -30,6 +32,7 @@ import static org.junit.Assert.assertThat;
 /**
  * Tests for {@link MetricsFilterParameter}.
  */
+@Category(Flip6.class)
 public class MetricsFilterParameterTest extends TestLogger {
 
 	private MetricsFilterParameter metricsFilterParameter;

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeadersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeadersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeadersTest.java
index 345ad74..0f82465 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeadersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsHeadersTest.java
@@ -21,8 +21,11 @@ package org.apache.flink.runtime.rest.messages.job.metrics;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
@@ -31,7 +34,8 @@ import static org.junit.Assert.assertThat;
 /**
  * Tests for {@link SubtaskMetricsHeaders}.
  */
-public class SubtaskMetricsHeadersTest {
+@Category(Flip6.class)
+public class SubtaskMetricsHeadersTest extends TestLogger {
 
 	private final SubtaskMetricsHeaders subtaskMetricsHeaders = SubtaskMetricsHeaders.getInstance();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeadersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeadersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeadersTest.java
index ee2848a..477e9f8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeadersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/metrics/TaskManagerMetricsHeadersTest.java
@@ -19,8 +19,11 @@
 package org.apache.flink.runtime.rest.messages.job.metrics;
 
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
@@ -29,7 +32,8 @@ import static org.junit.Assert.assertThat;
 /**
  * Tests for {@link TaskManagerMetricsHeaders}.
  */
-public class TaskManagerMetricsHeadersTest {
+@Category(Flip6.class)
+public class TaskManagerMetricsHeadersTest extends TestLogger {
 
 	private final TaskManagerMetricsHeaders taskManagerMetricsHeaders =
 		TaskManagerMetricsHeaders.getInstance();

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameterTest.java
index f9c234f..379fe4c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerIdPathParameterTest.java
@@ -19,9 +19,12 @@
 package org.apache.flink.runtime.rest.messages.taskmanager;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
@@ -30,7 +33,8 @@ import static org.junit.Assert.assertTrue;
 /**
  * Tests for {@link TaskManagerIdPathParameter}.
  */
-public class TaskManagerIdPathParameterTest {
+@Category(Flip6.class)
+public class TaskManagerIdPathParameterTest extends TestLogger {
 
 	private TaskManagerIdPathParameter taskManagerIdPathParameter;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
index eb879c4..6c145e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
@@ -22,9 +22,11 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.testutils.category.OldAndFlip6;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -45,6 +47,7 @@ import static org.powermock.api.mockito.PowerMockito.when;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(EnvironmentInformation.class)
+@Category(OldAndFlip6.class)
 public class NetworkBufferCalculationTest extends TestLogger {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8353123b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index aa6c872..4c79ec4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -31,14 +31,18 @@ import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.Preconditions;
 
+import org.junit.experimental.categories.Category;
+
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
 /**
  * Simple {@link TaskExecutorGateway} implementation for testing purposes.
  */
+@Category(Flip6.class)
 public class TestingTaskExecutorGateway implements TaskExecutorGateway {
 
 	private final String address;