You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/12 13:17:43 UTC

[GitHub] GJL closed pull request #6820: [BP-1.5][FLINK-10309][rest] Before shutting down cluster, wait for asynchronous operations

GJL closed pull request #6820: [BP-1.5][FLINK-10309][rest] Before shutting down cluster, wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6820
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index e0561a7e177..dfee20e3d6a 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -503,10 +503,6 @@ protected MetricRegistryImpl createMetricRegistry(Configuration configuration) {
 				}
 			}
 
-			if (webMonitorEndpoint != null) {
-				terminationFutures.add(webMonitorEndpoint.closeAsync());
-			}
-
 			if (dispatcher != null) {
 				dispatcher.shutDown();
 				terminationFutures.add(dispatcher.getTerminationFuture());
@@ -566,10 +562,13 @@ private Configuration generateClusterConfiguration(Configuration configuration)
 		if (isShutDown.compareAndSet(false, true)) {
 			LOG.info("Stopping {}.", getClass().getSimpleName());
 
-			final CompletableFuture<Void> shutDownApplicationFuture = deregisterApplication(applicationStatus, diagnostics);
+			final CompletableFuture<Void> closeWebMonitorAndDeregisterAppFuture =
+				FutureUtils.composeAfterwards(
+					webMonitorEndpoint.closeAsync(),
+					() -> deregisterApplication(applicationStatus, diagnostics));
 
 			final CompletableFuture<Void> componentShutdownFuture = FutureUtils.composeAfterwards(
-				shutDownApplicationFuture,
+				closeWebMonitorAndDeregisterAppFuture,
 				this::stopClusterComponents);
 
 			final CompletableFuture<Void> serviceShutdownFuture = FutureUtils.composeAfterwards(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 13a06b41e91..cd102b32f89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -676,7 +676,7 @@ public void acknowledgeCheckpoint(
 				try {
 					checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
 				} catch (Throwable t) {
-					log.warn("Error while processing checkpoint acknowledgement message");
+					log.warn("Error while processing checkpoint acknowledgement message", t);
 				}
 			});
 		} else {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 3e85271863e..3a573ac9fcc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -64,6 +64,7 @@
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 /**
  * An abstract class for netty-based REST server endpoints.
@@ -86,6 +87,7 @@
 
 	private final CompletableFuture<Void> terminationFuture;
 
+	private List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers;
 	private ServerBootstrap bootstrap;
 	private Channel serverChannel;
 	private String restBaseUrl;
@@ -132,7 +134,7 @@ public final void start() throws Exception {
 			final Router router = new Router();
 			final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();
 
-			List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = initializeHandlers(restAddressFuture);
+			handlers = initializeHandlers(restAddressFuture);
 
 			/* sort the handlers such that they are ordered the following:
 			 * /jobs
@@ -266,7 +268,9 @@ public String getRestBaseUrl() {
 			log.info("Shutting down rest endpoint.");
 
 			if (state == State.RUNNING) {
-				final CompletableFuture<Void> shutDownFuture = shutDownInternal();
+				final CompletableFuture<Void> shutDownFuture = FutureUtils.composeAfterwards(
+					closeHandlersAsync(),
+					this::shutDownInternal);
 
 				shutDownFuture.whenComplete(
 					(Void ignored, Throwable throwable) -> {
@@ -286,6 +290,14 @@ public String getRestBaseUrl() {
 		}
 	}
 
+	private FutureUtils.ConjunctFuture<Void> closeHandlersAsync() {
+		return FutureUtils.waitForAll(handlers.stream()
+			.map(tuple -> tuple.f1)
+			.filter(handler -> handler instanceof AutoCloseableAsync)
+			.map(handler -> ((AutoCloseableAsync) handler).closeAsync())
+			.collect(Collectors.toList()));
+	}
+
 	/**
 	 * Stops this REST server endpoint.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
similarity index 86%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index 0a0d833626e..b9ce232d1ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -16,14 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.rest;
+package org.apache.flink.runtime.rest.handler;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.rest.handler.FileUploads;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.HandlerRequestException;
-import org.apache.flink.runtime.rest.handler.RedirectHandler;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.FileUploadHandler;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
@@ -32,6 +29,7 @@
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
@@ -64,7 +62,7 @@
  * @param <R> type of the incoming request
  * @param <M> type of the message parameters
  */
-public abstract class AbstractHandler<T extends RestfulGateway, R extends RequestBody, M extends MessageParameters> extends RedirectHandler<T> {
+public abstract class AbstractHandler<T extends RestfulGateway, R extends RequestBody, M extends MessageParameters> extends RedirectHandler<T> implements AutoCloseableAsync {
 
 	protected final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -72,6 +70,12 @@
 
 	private final UntypedResponseMessageHeaders<R, M> untypedResponseMessageHeaders;
 
+	/**
+	 * Used to ensure that the handler is not closed while there are still in-flight requests
+	 * dispatched outside of Netty's {@link org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor}.
+	 */
+	private final InFlightRequestTracker inFlightRequestTracker;
+
 	protected AbstractHandler(
 			@Nonnull CompletableFuture<String> localAddressFuture,
 			@Nonnull GatewayRetriever<? extends T> leaderRetriever,
@@ -81,6 +85,7 @@ protected AbstractHandler(
 		super(localAddressFuture, leaderRetriever, timeout, responseHeaders);
 
 		this.untypedResponseMessageHeaders = Preconditions.checkNotNull(untypedResponseMessageHeaders);
+		this.inFlightRequestTracker = new InFlightRequestTracker();
 	}
 
 	@Override
@@ -93,6 +98,7 @@ protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, T gatew
 
 		FileUploads uploadedFiles = null;
 		try {
+			inFlightRequestTracker.registerRequest();
 			if (!(httpRequest instanceof FullHttpRequest)) {
 				// The RestServerEndpoint defines a HttpObjectAggregator in the pipeline that always returns
 				// FullHttpRequests.
@@ -150,8 +156,12 @@ protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, T gatew
 
 			final FileUploads finalUploadedFiles = uploadedFiles;
 			requestProcessingFuture
-				.whenComplete((Void ignored, Throwable throwable) -> cleanupFileUploads(finalUploadedFiles));
+				.whenComplete((Void ignored, Throwable throwable) -> {
+					inFlightRequestTracker.deregisterRequest();
+					cleanupFileUploads(finalUploadedFiles);
+				});
 		} catch (RestHandlerException rhe) {
+			inFlightRequestTracker.deregisterRequest();
 			HandlerUtils.sendErrorResponse(
 				ctx,
 				httpRequest,
@@ -160,6 +170,7 @@ protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, T gatew
 				responseHeaders);
 			cleanupFileUploads(uploadedFiles);
 		} catch (Throwable e) {
+			inFlightRequestTracker.deregisterRequest();
 			log.error("Request processing failed.", e);
 			HandlerUtils.sendErrorResponse(
 				ctx,
@@ -171,6 +182,15 @@ protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, T gatew
 		}
 	}
 
+	@Override
+	public final CompletableFuture<Void> closeAsync() {
+		return FutureUtils.composeAfterwards(closeHandlerAsync(), inFlightRequestTracker::awaitAsync);
+	}
+
+	protected CompletableFuture<Void> closeHandlerAsync() {
+		return CompletableFuture.completedFuture(null);
+	}
+
 	private void cleanupFileUploads(@Nullable FileUploads uploadedFiles) {
 		if (uploadedFiles != null) {
 			try {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 9cfb58e98a8..29c0633980c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -21,7 +21,6 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.rest.AbstractHandler;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/InFlightRequestTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/InFlightRequestTracker.java
new file mode 100644
index 00000000000..92478b1886f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/InFlightRequestTracker.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Phaser;
+
+/**
+ * Tracks in-flight client requests.
+ *
+ * @see AbstractHandler
+ */
+@ThreadSafe
+class InFlightRequestTracker {
+
+	private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+	private final Phaser phaser = new Phaser(1) {
+		@Override
+		protected boolean onAdvance(final int phase, final int registeredParties) {
+			terminationFuture.complete(null);
+			return true;
+		}
+	};
+
+	/**
+	 * Registers an in-flight request.
+	 */
+	public void registerRequest() {
+		phaser.register();
+	}
+
+	/**
+	 * Deregisters an in-flight request.
+	 */
+	public void deregisterRequest() {
+		phaser.arriveAndDeregister();
+	}
+
+	/**
+	 * Returns a future that completes when the in-flight requests that were registered prior to
+	 * calling this method are deregistered.
+	 */
+	public CompletableFuture<Void> awaitAsync() {
+		phaser.arriveAndDeregister();
+		return terminationFuture;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java
index e0c3fbec152..32b04921e42 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rest.handler.async;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rest.NotFoundException;
@@ -29,24 +28,14 @@
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.runtime.rest.messages.TriggerId;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.types.Either;
-import org.apache.flink.util.FlinkException;
-
-import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.ThreadSafe;
 
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 
 /**
  * HTTP handlers for asynchronous operations.
@@ -176,7 +165,7 @@ protected StatusHandler(
 			final Either<Throwable, R> operationResultOrError;
 			try {
 				operationResultOrError = completedOperationCache.get(key);
-			} catch (UnknownOperationKey e) {
+			} catch (UnknownOperationKeyException e) {
 				return FutureUtils.completedExceptionally(
 					new NotFoundException("Operation not found under key: " + key, e));
 			}
@@ -194,6 +183,11 @@ protected StatusHandler(
 			}
 		}
 
+		@Override
+		public CompletableFuture<Void> closeHandlerAsync() {
+			return completedOperationCache.closeAsync();
+		}
+
 		/**
 		 * Extract the operation key under which the operation result future is stored.
 		 *
@@ -220,79 +214,4 @@ protected StatusHandler(
 		protected abstract V operationResultResponse(R operationResult);
 	}
 
-	/**
-	 * Cache to manage ongoing operations.
-	 *
-	 * <p>The cache allows to register ongoing operations by calling
-	 * {@link #registerOngoingOperation(K, CompletableFuture)}, where the
-	 * {@code CompletableFuture} contains the operation result. Completed operations will be
-	 * removed from the cache automatically after a fixed timeout.
-	 */
-	@ThreadSafe
-	protected static class CompletedOperationCache<K, R> {
-
-		private static final long COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS = 300L;
-
-		/**
-		 * Stores SavepointKeys of ongoing savepoint.
-		 * If the savepoint completes, it will be moved to {@link #completedOperations}.
-		 */
-		private final Set<K> registeredOperationTriggers = ConcurrentHashMap.newKeySet();
-
-		/** Caches the location of completed operations. */
-		private final Cache<K, Either<Throwable, R>> completedOperations =
-			CacheBuilder.newBuilder()
-				.expireAfterWrite(COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS, TimeUnit.SECONDS)
-				.build();
-
-		/**
-		 * Registers an ongoing operation with the cache.
-		 *
-		 * @param operationResultFuture A future containing the operation result.
-		 */
-		public void registerOngoingOperation(
-			final K operationKey,
-			final CompletableFuture<R> operationResultFuture) {
-			registeredOperationTriggers.add(operationKey);
-			operationResultFuture.whenComplete((savepointLocation, error) -> {
-				if (error == null) {
-					completedOperations.put(operationKey, Either.Right(savepointLocation));
-				} else {
-					completedOperations.put(operationKey, Either.Left(error));
-				}
-				registeredOperationTriggers.remove(operationKey);
-			});
-		}
-
-		/**
-		 * Returns the operation result or a {@code Throwable} if the {@code CompletableFuture}
-		 * finished, otherwise {@code null}.
-		 *
-		 * @throws UnknownOperationKey If the operation is not found, and there is no ongoing
-		 *                                   operation under the provided key.
-		 */
-		@Nullable
-		public Either<Throwable, R> get(
-			final K operationKey) throws UnknownOperationKey {
-			Either<Throwable, R> operationResultOrError = null;
-			if (!registeredOperationTriggers.contains(operationKey)
-				&& (operationResultOrError = completedOperations.getIfPresent(operationKey)) == null) {
-				throw new UnknownOperationKey(operationKey);
-			}
-			return operationResultOrError;
-		}
-	}
-
-	/**
-	 * Exception that indicates that there is no ongoing or completed savepoint for a given
-	 * {@link JobID} and {@link TriggerId} pair.
-	 */
-	static class UnknownOperationKey extends FlinkException {
-		private static final long serialVersionUID = 1L;
-
-		UnknownOperationKey(final Object operationKey) {
-			super("No ongoing operation for " + operationKey);
-		}
-	}
-
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
new file mode 100644
index 00000000000..95bb2239fb2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Cache to manage ongoing operations.
+ *
+ * <p>The cache allows to register ongoing operations by calling
+ * {@link #registerOngoingOperation(K, CompletableFuture)}, where the
+ * {@code CompletableFuture} contains the operation result. Completed operations will be
+ * removed from the cache automatically after a fixed timeout.
+ */
+@ThreadSafe
+class CompletedOperationCache<K extends OperationKey, R> implements AutoCloseableAsync {
+
+	private static final long COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS = 300L;
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(CompletedOperationCache.class);
+
+	/**
+	 * In-progress asynchronous operations.
+	 */
+	private final Map<K, ResultAccessTracker<R>> registeredOperationTriggers = new ConcurrentHashMap<>();
+
+	/**
+	 * Caches the result of completed operations.
+	 */
+	private final Cache<K, ResultAccessTracker<R>> completedOperations;
+
+	CompletedOperationCache() {
+		this(Ticker.systemTicker());
+	}
+
+	@VisibleForTesting
+	CompletedOperationCache(final Ticker ticker) {
+		completedOperations = CacheBuilder.newBuilder()
+			.expireAfterWrite(COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS, TimeUnit.SECONDS)
+			.removalListener((RemovalListener<K, ResultAccessTracker<R>>) removalNotification -> {
+				if (removalNotification.wasEvicted()) {
+					Preconditions.checkState(removalNotification.getKey() != null);
+					Preconditions.checkState(removalNotification.getValue() != null);
+
+					// When shutting down the cache, we wait until all results are accessed.
+					// When a result gets evicted from the cache, it will not be possible to access
+					// it any longer, and we might be in the process of shutting down, so we mark
+					// the result as accessed to avoid waiting indefinitely.
+					removalNotification.getValue().markAccessed();
+
+					LOGGER.info("Evicted result with trigger id {} because its TTL of {}s has expired.",
+						removalNotification.getKey().getTriggerId(),
+						COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS);
+				}
+			})
+			.ticker(ticker)
+			.build();
+	}
+
+	/**
+	 * Registers an ongoing operation with the cache.
+	 *
+	 * @param operationResultFuture A future containing the operation result.
+	 */
+	public void registerOngoingOperation(
+			final K operationKey,
+			final CompletableFuture<R> operationResultFuture) {
+		final ResultAccessTracker<R> inProgress = ResultAccessTracker.inProgress();
+		registeredOperationTriggers.put(operationKey, inProgress);
+		operationResultFuture.whenComplete((result, error) -> {
+			if (error == null) {
+				completedOperations.put(operationKey, inProgress.finishOperation(Either.Right(result)));
+			} else {
+				completedOperations.put(operationKey, inProgress.finishOperation(Either.Left(error)));
+			}
+			registeredOperationTriggers.remove(operationKey);
+		});
+	}
+
+	/**
+	 * Returns the operation result or a {@code Throwable} if the {@code CompletableFuture}
+	 * finished, otherwise {@code null}.
+	 *
+	 * @throws UnknownOperationKeyException If the operation is not found, and there is no ongoing
+	 *                                      operation under the provided key.
+	 */
+	@Nullable
+	public Either<Throwable, R> get(
+			final K operationKey) throws UnknownOperationKeyException {
+		ResultAccessTracker<R> resultAccessTracker;
+		if ((resultAccessTracker = registeredOperationTriggers.get(operationKey)) == null
+			&& (resultAccessTracker = completedOperations.getIfPresent(operationKey)) == null) {
+			throw new UnknownOperationKeyException(operationKey);
+		}
+
+		return resultAccessTracker.accessOperationResultOrError();
+	}
+
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		return FutureUtils.orTimeout(
+			asyncWaitForResultsToBeAccessed(),
+			COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS,
+			TimeUnit.SECONDS);
+	}
+
+	private CompletableFuture<Void> asyncWaitForResultsToBeAccessed() {
+		return FutureUtils.waitForAll(
+			Stream.concat(registeredOperationTriggers.values().stream(), completedOperations.asMap().values().stream())
+				.map(ResultAccessTracker::getAccessedFuture)
+				.collect(Collectors.toList()));
+	}
+
+	@VisibleForTesting
+	void cleanUp() {
+		completedOperations.cleanUp();
+	}
+
+	/**
+	 * Stores the result of an asynchronous operation, and tracks accesses to it.
+	 */
+	private static class ResultAccessTracker<R> {
+
+		/** Result of an asynchronous operation. Null if operation is in progress. */
+		@Nullable
+		private final Either<Throwable, R> operationResultOrError;
+
+		/** Future that completes if a non-null {@link #operationResultOrError} is accessed. */
+		private final CompletableFuture<Void> accessed;
+
+		private static <R> ResultAccessTracker<R> inProgress() {
+			return new ResultAccessTracker<>();
+		}
+
+		private ResultAccessTracker() {
+			this.operationResultOrError = null;
+			this.accessed = new CompletableFuture<>();
+		}
+
+		private ResultAccessTracker(final Either<Throwable, R> operationResultOrError, final CompletableFuture<Void> accessed) {
+			this.operationResultOrError = checkNotNull(operationResultOrError);
+			this.accessed = checkNotNull(accessed);
+		}
+
+		/**
+		 * Creates a new instance of the tracker with the result of the asynchronous operation set.
+		 */
+		public ResultAccessTracker<R> finishOperation(final Either<Throwable, R> operationResultOrError) {
+			checkState(this.operationResultOrError == null);
+
+			return new ResultAccessTracker<>(checkNotNull(operationResultOrError), this.accessed);
+		}
+
+		/**
+		 * If present, returns the result of the asynchronous operation, and marks the result as
+		 * accessed. If the result is not present, this method returns null.
+		 */
+		@Nullable
+		public Either<Throwable, R> accessOperationResultOrError() {
+			if (operationResultOrError != null) {
+				markAccessed();
+			}
+			return operationResultOrError;
+		}
+
+		public CompletableFuture<Void> getAccessedFuture() {
+			return accessed;
+		}
+
+		private void markAccessed() {
+			accessed.complete(null);
+		}
+
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/UnknownOperationKeyException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/UnknownOperationKeyException.java
new file mode 100644
index 00000000000..962b80fe5e6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/UnknownOperationKeyException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception that indicates that there is no ongoing or completed savepoint for a given
+ * {@link JobID} and {@link TriggerId} pair.
+ */
+class UnknownOperationKeyException extends FlinkException {
+	private static final long serialVersionUID = 1L;
+
+	UnknownOperationKeyException(final Object operationKey) {
+		super("No ongoing operation for " + operationKey);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
index 4bb473e9ceb..e773702274f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
@@ -33,7 +33,7 @@
  * A pair of {@link JobID} and {@link TriggerId} used as a key to a hash based
  * collection.
  *
- * @see AbstractAsynchronousOperationHandlers.CompletedOperationCache
+ * @see AbstractAsynchronousOperationHandlers
  */
 @Immutable
 public class AsynchronousJobOperationKey extends OperationKey {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index 303f7d0ca24..4fd6d829df9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -24,7 +24,7 @@
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
-import org.apache.flink.runtime.rest.AbstractHandler;
+import org.apache.flink.runtime.rest.handler.AbstractHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
index c289634bc07..cc116dd7714 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
@@ -27,6 +27,7 @@
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ManualTicker;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
@@ -281,20 +282,6 @@ private FileArchivedExecutionGraphStore createDefaultExecutionGraphStore(File st
 			Ticker.systemTicker());
 	}
 
-	private static final class ManualTicker extends Ticker {
-
-		private long currentTime = 0;
-
-		@Override
-		public long read() {
-			return currentTime;
-		}
-
-		void advanceTime(long duration, TimeUnit timeUnit) {
-			currentTime += timeUnit.toNanos(duration);
-		}
-	}
-
 	private static final class PartialArchivedExecutionGraphMatcher extends BaseMatcher<ArchivedExecutionGraph> {
 
 		private final ArchivedExecutionGraph archivedExecutionGraph;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 59db1630cce..cc145bdf58c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -90,6 +90,7 @@
 import static java.util.Objects.requireNonNull;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -121,6 +122,8 @@
 	private final Configuration config;
 	private SSLContext defaultSSLContext;
 
+	private TestHandler testHandler;
+
 	public RestServerEndpointITCase(final Configuration config) {
 		this.config = requireNonNull(config);
 	}
@@ -179,7 +182,7 @@ public void setup() throws Exception {
 		final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
 			CompletableFuture.completedFuture(mockRestfulGateway);
 
-		TestHandler testHandler = new TestHandler(
+		testHandler = new TestHandler(
 			CompletableFuture.completedFuture(restAddress),
 			mockGatewayRetriever,
 			RpcUtils.INF_TIMEOUT);
@@ -217,7 +220,7 @@ public void teardown() throws Exception {
 		}
 
 		if (serverEndpoint != null) {
-			serverEndpoint.close();
+			serverEndpoint.closeAsync().get(timeout.getSize(), timeout.getUnit());
 			serverEndpoint = null;
 		}
 	}
@@ -413,6 +416,49 @@ public void testStaticFileServerHandler() throws Exception {
 		assertEquals("foobar", fileContents.trim());
 	}
 
+	/**
+	 * Tests that after calling {@link RestServerEndpoint#closeAsync()}, the handlers are closed
+	 * first, and we wait for in-flight requests to finish. As long as not all handlers are closed,
+	 * HTTP requests should be served.
+	 */
+	@Test
+	public void testShouldWaitForHandlersWhenClosing() throws Exception {
+		final CompletableFuture<Void> closeHandlerFuture = new CompletableFuture<>();
+		testHandler.closeFuture = closeHandlerFuture;
+
+		// Initiate closing RestServerEndpoint but the handler should block.
+		final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
+		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
+
+		final TestParameters parameters = new TestParameters();
+		parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
+		parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
+
+		final CompletableFuture<TestResponse> request;
+		synchronized (TestHandler.LOCK) {
+			request = restClient.sendRequest(
+				serverAddress.getHostName(),
+				serverAddress.getPort(),
+				new TestHeaders(),
+				parameters,
+				new TestRequest(1));
+			TestHandler.LOCK.wait();
+		}
+
+		// Allow handler to close but there is still one in-flight request which should prevent
+		// the RestServerEndpoint from closing.
+		closeHandlerFuture.complete(null);
+		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
+
+		// Finish the in-flight request.
+		synchronized (TestHandler.LOCK) {
+			TestHandler.LOCK.notifyAll();
+		}
+
+		request.get(timeout.getSize(), timeout.getUnit());
+		closeRestServerEndpointFuture.get(timeout.getSize(), timeout.getUnit());
+	}
+
 	private HttpURLConnection openHttpConnectionForUpload(final String boundary) throws IOException {
 		final HttpURLConnection connection =
 			(HttpURLConnection) new URL(serverEndpoint.getRestBaseUrl() + "/upload").openConnection();
@@ -459,6 +505,8 @@ protected void startInternal() {}
 
 		private static final int LARGE_RESPONSE_BODY_ID = 3;
 
+		private CompletableFuture<Void> closeFuture = CompletableFuture.completedFuture(null);
+
 		TestHandler(
 				CompletableFuture<String> localAddressFuture,
 				GatewayRetriever<RestfulGateway> leaderRetriever,
@@ -478,19 +526,30 @@ protected void startInternal() {}
 
 			final int id = request.getRequestBody().id;
 			if (id == 1) {
-				synchronized (LOCK) {
-					try {
-						LOCK.notifyAll();
-						LOCK.wait();
-					} catch (InterruptedException ignored) {
+				// Intentionally schedule the work on a different thread. This is to simulate
+				// handlers where the CompletableFuture is finished by the RPC framework.
+				return CompletableFuture.supplyAsync(() -> {
+					synchronized (LOCK) {
+						try {
+							LOCK.notifyAll();
+							LOCK.wait();
+						} catch (InterruptedException ignored) {
+						}
 					}
-				}
+					return new TestResponse(id);
+				});
 			} else if (id == LARGE_RESPONSE_BODY_ID) {
 				return CompletableFuture.completedFuture(new TestResponse(
 					id,
 					createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH)));
+			} else {
+				return CompletableFuture.completedFuture(new TestResponse(id));
 			}
-			return CompletableFuture.completedFuture(new TestResponse(id));
+		}
+
+		@Override
+		public CompletableFuture<Void> closeHandlerAsync() {
+			return closeFuture;
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerTest.java
similarity index 96%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerTest.java
index e759d305d6e..c4e940712c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerTest.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.rest;
+package org.apache.flink.runtime.rest.handler;
 
-import org.apache.flink.runtime.rest.handler.FileUploads;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/InFlightRequestTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/InFlightRequestTrackerTest.java
new file mode 100644
index 00000000000..c486571d9cf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/InFlightRequestTrackerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link InFlightRequestTracker}.
+ */
+public class InFlightRequestTrackerTest {
+
+	private InFlightRequestTracker inFlightRequestTracker;
+
+	@Before
+	public void setUp() {
+		inFlightRequestTracker = new InFlightRequestTracker();
+	}
+
+	@Test
+	public void testShouldFinishAwaitAsyncImmediatelyIfNoRequests() {
+		assertTrue(inFlightRequestTracker.awaitAsync().isDone());
+	}
+
+	@Test
+	public void testShouldFinishAwaitAsyncIffAllRequestsDeregistered() {
+		inFlightRequestTracker.registerRequest();
+
+		final CompletableFuture<Void> awaitFuture = inFlightRequestTracker.awaitAsync();
+		assertFalse(awaitFuture.isDone());
+
+		inFlightRequestTracker.deregisterRequest();
+		assertTrue(awaitFuture.isDone());
+	}
+
+	@Test
+	public void testAwaitAsyncIsIdempotent() {
+		final CompletableFuture<Void> awaitFuture = inFlightRequestTracker.awaitAsync();
+		assertTrue(awaitFuture.isDone());
+
+		assertSame(
+			"The reference to the future must not change",
+			awaitFuture,
+			inFlightRequestTracker.awaitAsync());
+	}
+
+	@Test
+	public void testShouldTolerateRegisterAfterAwaitAsync() {
+		final CompletableFuture<Void> awaitFuture = inFlightRequestTracker.awaitAsync();
+		assertTrue(awaitFuture.isDone());
+
+		inFlightRequestTracker.registerRequest();
+
+		assertSame(
+			"The reference to the future must not change",
+			awaitFuture,
+			inFlightRequestTracker.awaitAsync());
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
index eeb41a829b8..fb384a938d1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
@@ -179,6 +179,32 @@ public void testUnknownTriggerId() throws Exception {
 		}
 	}
 
+	/**
+	 * Tests that the future returned by {@link AbstractAsynchronousOperationHandlers.StatusHandler#closeAsync()}
+	 * completes when the result of the asynchronous operation is served.
+	 */
+	@Test
+	public void testCloseShouldFinishOnFirstServedResult() throws Exception {
+		final CompletableFuture<String> savepointFuture = new CompletableFuture<>();
+		final TestingRestfulGateway testingRestfulGateway = new TestingRestfulGateway.Builder()
+			.setTriggerSavepointFunction((JobID jobId, String directory) -> savepointFuture)
+			.build();
+
+		final TriggerId triggerId = testingTriggerHandler.handleRequest(
+			triggerOperationRequest(),
+			testingRestfulGateway).get().getTriggerId();
+		final CompletableFuture<Void> closeFuture = testingStatusHandler.closeAsync();
+
+		testingStatusHandler.handleRequest(statusOperationRequest(triggerId), testingRestfulGateway).get();
+
+		assertThat(closeFuture.isDone(), is(false));
+
+		savepointFuture.complete("foobar");
+		testingStatusHandler.handleRequest(statusOperationRequest(triggerId), testingRestfulGateway).get();
+
+		assertThat(closeFuture.isDone(), is(true));
+	}
+
 	private static HandlerRequest<EmptyRequestBody, EmptyMessageParameters> triggerOperationRequest() throws HandlerRequestException {
 		return new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance());
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCacheTest.java
new file mode 100644
index 00000000000..2d13780f956
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCacheTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.runtime.util.ManualTicker;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link CompletedOperationCache}.
+ */
+public class CompletedOperationCacheTest extends TestLogger {
+
+	private static final OperationKey TEST_OPERATION_KEY = new OperationKey(new TriggerId());
+
+	private static final CompletableFuture<String> TEST_OPERATION_RESULT = CompletableFuture.completedFuture("foo");
+
+	private ManualTicker manualTicker;
+
+	private CompletedOperationCache<OperationKey, String> completedOperationCache;
+
+	@Before
+	public void setUp() {
+		manualTicker = new ManualTicker();
+		completedOperationCache = new CompletedOperationCache<>(manualTicker);
+	}
+
+	@Test
+	public void testShouldFinishClosingCacheIfAllResultsAreEvicted() {
+		completedOperationCache.registerOngoingOperation(TEST_OPERATION_KEY, TEST_OPERATION_RESULT);
+		final CompletableFuture<Void> closeCacheFuture = completedOperationCache.closeAsync();
+		assertThat(closeCacheFuture.isDone(), is(false));
+
+		manualTicker.advanceTime(300, TimeUnit.SECONDS);
+		completedOperationCache.cleanUp();
+
+		assertThat(closeCacheFuture.isDone(), is(true));
+	}
+
+	@Test
+	public void testShouldFinishClosingCacheIfAllResultsAccessed() throws Exception {
+		completedOperationCache.registerOngoingOperation(TEST_OPERATION_KEY, TEST_OPERATION_RESULT);
+		final CompletableFuture<Void> closeCacheFuture = completedOperationCache.closeAsync();
+		assertThat(closeCacheFuture.isDone(), is(false));
+
+		final Either<Throwable, String> operationResultOrError = completedOperationCache.get(TEST_OPERATION_KEY);
+
+		assertThat(operationResultOrError, is(notNullValue()));
+		assertThat(operationResultOrError.right(), is(equalTo(TEST_OPERATION_RESULT.get())));
+		assertThat(closeCacheFuture.isDone(), is(true));
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ManualTicker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ManualTicker.java
new file mode 100644
index 00000000000..d2e2e1d7bea
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ManualTicker.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Controllable {@link Ticker} implementation for tests.
+ */
+public final class ManualTicker extends Ticker {
+
+	private long currentTime;
+
+	@Override
+	public long read() {
+		return currentTime;
+	}
+
+	public void advanceTime(final long duration, final TimeUnit timeUnit) {
+		currentTime += timeUnit.toNanos(duration);
+	}
+}
diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties b/flink-yarn-tests/src/test/resources/log4j-test.properties
index 8f56c1fa9a5..42ae7ddf08e 100644
--- a/flink-yarn-tests/src/test/resources/log4j-test.properties
+++ b/flink-yarn-tests/src/test/resources/log4j-test.properties
@@ -35,7 +35,7 @@ log4j.logger.org.apache.flink.runtime.leaderelection=INFO
 log4j.logger.org.apache.flink.runtime.leaderretrieval=INFO
 
 log4j.logger.org.apache.directory=OFF
-log4j.logger.org.mortbay.log=OFF, testlogger
+log4j.logger.org.mortbay.log=OFF
 log4j.logger.net.sf.ehcache=OFF
 log4j.logger.org.apache.hadoop.metrics2=OFF
 log4j.logger.org.apache.hadoop.conf.Configuration=OFF


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services