You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/19 07:35:32 UTC

[GitHub] asfgit closed pull request #6785: [FLINK-10309][rest] Before shutting down cluster, wait for asynchronous operations

asfgit closed pull request #6785: [FLINK-10309][rest] Before shutting down cluster, wait for asynchronous operations
URL: https://github.com/apache/flink/pull/6785
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 9eaef34a33a..b2d6d150561 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -445,9 +445,7 @@ private Configuration generateClusterConfiguration(Configuration configuration)
 	private CompletableFuture<Void> closeClusterComponent(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
 		synchronized (lock) {
 			if (clusterComponent != null) {
-				final CompletableFuture<Void> deregisterApplicationFuture = clusterComponent.deregisterApplication(applicationStatus, diagnostics);
-
-				return FutureUtils.runAfterwards(deregisterApplicationFuture, clusterComponent::closeAsync);
+				return clusterComponent.deregisterApplicationAndClose(applicationStatus, diagnostics);
 			} else {
 				return CompletableFuture.completedFuture(null);
 			}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
index b07095c2b6d..6e28ab6c6b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/DispatcherResourceManagerComponent.java
@@ -26,7 +26,6 @@
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
-import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 
 import javax.annotation.Nonnull;
@@ -41,7 +40,7 @@
  * Component which starts a {@link Dispatcher}, {@link ResourceManager} and {@link WebMonitorEndpoint}
  * in the same process.
  */
-public class DispatcherResourceManagerComponent<T extends Dispatcher> implements AutoCloseableAsync {
+public class DispatcherResourceManagerComponent<T extends Dispatcher> {
 
 	@Nonnull
 	private final T dispatcher;
@@ -126,65 +125,77 @@ public T getDispatcher() {
 		return webMonitorEndpoint;
 	}
 
-	@Override
-	public CompletableFuture<Void> closeAsync() {
+	/**
+	 * Deregister the Flink application from the resource management system by signalling
+	 * the {@link ResourceManager}.
+	 *
+	 * @param applicationStatus to terminate the application with
+	 * @param diagnostics additional information about the shut down, can be {@code null}
+	 * @return Future which is completed once the shut down
+	 */
+	public CompletableFuture<Void> deregisterApplicationAndClose(
+			final ApplicationStatus applicationStatus,
+			final @Nullable String diagnostics) {
+
 		if (isRunning.compareAndSet(true, false)) {
-			Exception exception = null;
+			final CompletableFuture<Void> closeWebMonitorAndDeregisterAppFuture =
+				FutureUtils.composeAfterwards(webMonitorEndpoint.closeAsync(), () -> deregisterApplication(applicationStatus, diagnostics));
 
-			final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(4);
+			return FutureUtils.composeAfterwards(closeWebMonitorAndDeregisterAppFuture, this::closeAsyncInternal);
+		} else {
+			return terminationFuture;
+		}
+	}
 
-			try {
-				dispatcherLeaderRetrievalService.stop();
-			} catch (Exception e) {
-				exception = ExceptionUtils.firstOrSuppressed(e, exception);
-			}
+	private CompletableFuture<Void> deregisterApplication(
+			final ApplicationStatus applicationStatus,
+			final @Nullable String diagnostics) {
 
-			try {
-				resourceManagerRetrievalService.stop();
-			} catch (Exception e) {
-				exception = ExceptionUtils.firstOrSuppressed(e, exception);
-			}
+		final ResourceManagerGateway selfGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
+		return selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack -> null);
+	}
 
-			terminationFutures.add(webMonitorEndpoint.closeAsync());
+	private CompletableFuture<Void> closeAsyncInternal() {
+		Exception exception = null;
 
-			dispatcher.shutDown();
-			terminationFutures.add(dispatcher.getTerminationFuture());
+		final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(3);
 
-			resourceManager.shutDown();
-			terminationFutures.add(resourceManager.getTerminationFuture());
+		try {
+			dispatcherLeaderRetrievalService.stop();
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(e, exception);
+		}
 
-			if (exception != null) {
-				terminationFutures.add(FutureUtils.completedExceptionally(exception));
-			}
+		try {
+			resourceManagerRetrievalService.stop();
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(e, exception);
+		}
 
-			final CompletableFuture<Void> componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
+		dispatcher.shutDown();
+		terminationFutures.add(dispatcher.getTerminationFuture());
 
-			final CompletableFuture<Void> metricGroupTerminationFuture = FutureUtils.runAfterwards(
-				componentTerminationFuture,
-				jobManagerMetricGroup::close);
+		resourceManager.shutDown();
+		terminationFutures.add(resourceManager.getTerminationFuture());
 
-			metricGroupTerminationFuture.whenComplete((aVoid, throwable) -> {
-				if (throwable != null) {
-					terminationFuture.completeExceptionally(throwable);
-				} else {
-					terminationFuture.complete(aVoid);
-				}
-			});
+		if (exception != null) {
+			terminationFutures.add(FutureUtils.completedExceptionally(exception));
 		}
 
-		return terminationFuture;
-	}
+		final CompletableFuture<Void> componentTerminationFuture = FutureUtils.completeAll(terminationFutures);
 
-	/**
-	 * Deregister the Flink application from the resource management system by signalling
-	 * the {@link ResourceManager}.
-	 *
-	 * @param applicationStatus to terminate the application with
-	 * @param diagnostics additional information about the shut down, can be {@code null}
-	 * @return Future which is completed once the shut down
-	 */
-	public CompletableFuture<Void> deregisterApplication(ApplicationStatus applicationStatus, @Nullable String diagnostics) {
-		final ResourceManagerGateway selfGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
-		return selfGateway.deregisterApplication(applicationStatus, diagnostics).thenApply(ack -> null);
+		final CompletableFuture<Void> metricGroupTerminationFuture = FutureUtils.runAfterwards(
+			componentTerminationFuture,
+			jobManagerMetricGroup::close);
+
+		metricGroupTerminationFuture.whenComplete((aVoid, throwable) -> {
+			if (throwable != null) {
+				terminationFuture.completeExceptionally(throwable);
+			} else {
+				terminationFuture.complete(aVoid);
+			}
+		});
+
+		return terminationFuture;
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 31c6a97124b..5d2d363cf71 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -679,7 +679,7 @@ public void acknowledgeCheckpoint(
 				try {
 					checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
 				} catch (Throwable t) {
-					log.warn("Error while processing checkpoint acknowledgement message");
+					log.warn("Error while processing checkpoint acknowledgement message", t);
 				}
 			});
 		} else {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index 43636dd8f9e..068192cf794 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -63,6 +63,7 @@
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * An abstract class for netty-based REST server endpoints.
@@ -85,6 +86,7 @@
 
 	private final CompletableFuture<Void> terminationFuture;
 
+	private List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers;
 	private ServerBootstrap bootstrap;
 	private Channel serverChannel;
 	private String restBaseUrl;
@@ -131,7 +133,7 @@ public final void start() throws Exception {
 			final Router router = new Router();
 			final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();
 
-			List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = initializeHandlers(restAddressFuture);
+			handlers = initializeHandlers(restAddressFuture);
 
 			/* sort the handlers such that they are ordered the following:
 			 * /jobs
@@ -265,10 +267,13 @@ public String getRestBaseUrl() {
 			log.info("Shutting down rest endpoint.");
 
 			if (state == State.RUNNING) {
-				final CompletableFuture<Void> shutDownFuture = shutDownInternal();
+				final CompletableFuture<Void> shutDownFuture = FutureUtils.composeAfterwards(
+					closeHandlersAsync(),
+					this::shutDownInternal);
 
 				shutDownFuture.whenComplete(
 					(Void ignored, Throwable throwable) -> {
+						log.info("Shut down complete.");
 						if (throwable != null) {
 							terminationFuture.completeExceptionally(throwable);
 						} else {
@@ -285,6 +290,14 @@ public String getRestBaseUrl() {
 		}
 	}
 
+	private FutureUtils.ConjunctFuture<Void> closeHandlersAsync() {
+		return FutureUtils.waitForAll(handlers.stream()
+			.map(tuple -> tuple.f1)
+			.filter(handler -> handler instanceof AutoCloseableAsync)
+			.map(handler -> ((AutoCloseableAsync) handler).closeAsync())
+			.collect(Collectors.toList()));
+	}
+
 	/**
 	 * Stops this REST server endpoint.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
similarity index 87%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
index 3d1ec9d0066..5a1c371d5a6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java
@@ -16,14 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.rest;
+package org.apache.flink.runtime.rest.handler;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.rest.handler.FileUploads;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.HandlerRequestException;
-import org.apache.flink.runtime.rest.handler.RedirectHandler;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.FileUploadHandler;
 import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
@@ -33,6 +30,7 @@
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
@@ -64,7 +62,7 @@
  * @param <R> type of the incoming request
  * @param <M> type of the message parameters
  */
-public abstract class AbstractHandler<T extends RestfulGateway, R extends RequestBody, M extends MessageParameters> extends RedirectHandler<T> {
+public abstract class AbstractHandler<T extends RestfulGateway, R extends RequestBody, M extends MessageParameters> extends RedirectHandler<T> implements AutoCloseableAsync {
 
 	protected final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -72,6 +70,11 @@
 
 	private final UntypedResponseMessageHeaders<R, M> untypedResponseMessageHeaders;
 
+	/**
+	 * Used to ensure that the handler is not closed while there are still in-flight requests.
+	 */
+	private final InFlightRequestTracker inFlightRequestTracker;
+
 	protected AbstractHandler(
 			@Nonnull CompletableFuture<String> localAddressFuture,
 			@Nonnull GatewayRetriever<? extends T> leaderRetriever,
@@ -81,6 +84,7 @@ protected AbstractHandler(
 		super(localAddressFuture, leaderRetriever, timeout, responseHeaders);
 
 		this.untypedResponseMessageHeaders = Preconditions.checkNotNull(untypedResponseMessageHeaders);
+		this.inFlightRequestTracker = new InFlightRequestTracker();
 	}
 
 	@Override
@@ -92,6 +96,7 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe
 
 		FileUploads uploadedFiles = null;
 		try {
+			inFlightRequestTracker.registerRequest();
 			if (!(httpRequest instanceof FullHttpRequest)) {
 				// The RestServerEndpoint defines a HttpObjectAggregator in the pipeline that always returns
 				// FullHttpRequests.
@@ -154,8 +159,12 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe
 
 			final FileUploads finalUploadedFiles = uploadedFiles;
 			requestProcessingFuture
-				.whenComplete((Void ignored, Throwable throwable) -> cleanupFileUploads(finalUploadedFiles));
+				.whenComplete((Void ignored, Throwable throwable) -> {
+					inFlightRequestTracker.deregisterRequest();
+					cleanupFileUploads(finalUploadedFiles);
+				});
 		} catch (RestHandlerException rhe) {
+			inFlightRequestTracker.deregisterRequest();
 			HandlerUtils.sendErrorResponse(
 				ctx,
 				httpRequest,
@@ -164,6 +173,7 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe
 				responseHeaders);
 			cleanupFileUploads(uploadedFiles);
 		} catch (Throwable e) {
+			inFlightRequestTracker.deregisterRequest();
 			log.error("Request processing failed.", e);
 			HandlerUtils.sendErrorResponse(
 				ctx,
@@ -175,6 +185,15 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe
 		}
 	}
 
+	@Override
+	public final CompletableFuture<Void> closeAsync() {
+		return FutureUtils.composeAfterwards(closeHandlerAsync(), inFlightRequestTracker::awaitAsync);
+	}
+
+	protected CompletableFuture<Void> closeHandlerAsync() {
+		return CompletableFuture.completedFuture(null);
+	}
+
 	private void cleanupFileUploads(@Nullable FileUploads uploadedFiles) {
 		if (uploadedFiles != null) {
 			try {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 9cfb58e98a8..0397cb875f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -21,7 +21,6 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.rest.AbstractHandler;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
 import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -81,11 +80,9 @@ protected AbstractRestHandler(
 			response = FutureUtils.completedExceptionally(e);
 		}
 
-		return response.whenComplete((P resp, Throwable throwable) -> {
-			Tuple2<ResponseBody, HttpResponseStatus> r = throwable != null ?
-				errorResponse(throwable) : Tuple2.of(resp, messageHeaders.getResponseStatusCode());
-			HandlerUtils.sendResponse(ctx, httpRequest, r.f0, r.f1, responseHeaders);
-		}).thenApply(ignored -> null);
+		return response.handle((resp, throwable) -> throwable != null ?
+			errorResponse(throwable) : Tuple2.of(resp, messageHeaders.getResponseStatusCode()))
+			.thenCompose(r -> HandlerUtils.sendResponse(ctx, httpRequest, r.f0, r.f1, responseHeaders));
 	}
 
 	private Tuple2<ResponseBody, HttpResponseStatus> errorResponse(Throwable throwable) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/InFlightRequestTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/InFlightRequestTracker.java
new file mode 100644
index 00000000000..92478b1886f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/InFlightRequestTracker.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Phaser;
+
+/**
+ * Tracks in-flight client requests.
+ *
+ * @see AbstractHandler
+ */
+@ThreadSafe
+class InFlightRequestTracker {
+
+	private final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
+
+	private final Phaser phaser = new Phaser(1) {
+		@Override
+		protected boolean onAdvance(final int phase, final int registeredParties) {
+			terminationFuture.complete(null);
+			return true;
+		}
+	};
+
+	/**
+	 * Registers an in-flight request.
+	 */
+	public void registerRequest() {
+		phaser.register();
+	}
+
+	/**
+	 * Deregisters an in-flight request.
+	 */
+	public void deregisterRequest() {
+		phaser.arriveAndDeregister();
+	}
+
+	/**
+	 * Returns a future that completes when the in-flight requests that were registered prior to
+	 * calling this method are deregistered.
+	 */
+	public CompletableFuture<Void> awaitAsync() {
+		phaser.arriveAndDeregister();
+		return terminationFuture;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java
index e0c3fbec152..32b04921e42 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlers.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rest.handler.async;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rest.NotFoundException;
@@ -29,24 +28,14 @@
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.runtime.rest.messages.TriggerId;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.types.Either;
-import org.apache.flink.util.FlinkException;
-
-import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import javax.annotation.concurrent.ThreadSafe;
 
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 
 /**
  * HTTP handlers for asynchronous operations.
@@ -176,7 +165,7 @@ protected StatusHandler(
 			final Either<Throwable, R> operationResultOrError;
 			try {
 				operationResultOrError = completedOperationCache.get(key);
-			} catch (UnknownOperationKey e) {
+			} catch (UnknownOperationKeyException e) {
 				return FutureUtils.completedExceptionally(
 					new NotFoundException("Operation not found under key: " + key, e));
 			}
@@ -194,6 +183,11 @@ protected StatusHandler(
 			}
 		}
 
+		@Override
+		public CompletableFuture<Void> closeHandlerAsync() {
+			return completedOperationCache.closeAsync();
+		}
+
 		/**
 		 * Extract the operation key under which the operation result future is stored.
 		 *
@@ -220,79 +214,4 @@ protected StatusHandler(
 		protected abstract V operationResultResponse(R operationResult);
 	}
 
-	/**
-	 * Cache to manage ongoing operations.
-	 *
-	 * <p>The cache allows to register ongoing operations by calling
-	 * {@link #registerOngoingOperation(K, CompletableFuture)}, where the
-	 * {@code CompletableFuture} contains the operation result. Completed operations will be
-	 * removed from the cache automatically after a fixed timeout.
-	 */
-	@ThreadSafe
-	protected static class CompletedOperationCache<K, R> {
-
-		private static final long COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS = 300L;
-
-		/**
-		 * Stores SavepointKeys of ongoing savepoint.
-		 * If the savepoint completes, it will be moved to {@link #completedOperations}.
-		 */
-		private final Set<K> registeredOperationTriggers = ConcurrentHashMap.newKeySet();
-
-		/** Caches the location of completed operations. */
-		private final Cache<K, Either<Throwable, R>> completedOperations =
-			CacheBuilder.newBuilder()
-				.expireAfterWrite(COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS, TimeUnit.SECONDS)
-				.build();
-
-		/**
-		 * Registers an ongoing operation with the cache.
-		 *
-		 * @param operationResultFuture A future containing the operation result.
-		 */
-		public void registerOngoingOperation(
-			final K operationKey,
-			final CompletableFuture<R> operationResultFuture) {
-			registeredOperationTriggers.add(operationKey);
-			operationResultFuture.whenComplete((savepointLocation, error) -> {
-				if (error == null) {
-					completedOperations.put(operationKey, Either.Right(savepointLocation));
-				} else {
-					completedOperations.put(operationKey, Either.Left(error));
-				}
-				registeredOperationTriggers.remove(operationKey);
-			});
-		}
-
-		/**
-		 * Returns the operation result or a {@code Throwable} if the {@code CompletableFuture}
-		 * finished, otherwise {@code null}.
-		 *
-		 * @throws UnknownOperationKey If the operation is not found, and there is no ongoing
-		 *                                   operation under the provided key.
-		 */
-		@Nullable
-		public Either<Throwable, R> get(
-			final K operationKey) throws UnknownOperationKey {
-			Either<Throwable, R> operationResultOrError = null;
-			if (!registeredOperationTriggers.contains(operationKey)
-				&& (operationResultOrError = completedOperations.getIfPresent(operationKey)) == null) {
-				throw new UnknownOperationKey(operationKey);
-			}
-			return operationResultOrError;
-		}
-	}
-
-	/**
-	 * Exception that indicates that there is no ongoing or completed savepoint for a given
-	 * {@link JobID} and {@link TriggerId} pair.
-	 */
-	static class UnknownOperationKey extends FlinkException {
-		private static final long serialVersionUID = 1L;
-
-		UnknownOperationKey(final Object operationKey) {
-			super("No ongoing operation for " + operationKey);
-		}
-	}
-
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
new file mode 100644
index 00000000000..95bb2239fb2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCache.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Cache to manage ongoing operations.
+ *
+ * <p>The cache allows to register ongoing operations by calling
+ * {@link #registerOngoingOperation(K, CompletableFuture)}, where the
+ * {@code CompletableFuture} contains the operation result. Completed operations will be
+ * removed from the cache automatically after a fixed timeout.
+ */
+@ThreadSafe
+class CompletedOperationCache<K extends OperationKey, R> implements AutoCloseableAsync {
+
+	private static final long COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS = 300L;
+
+	private static final Logger LOGGER = LoggerFactory.getLogger(CompletedOperationCache.class);
+
+	/**
+	 * In-progress asynchronous operations.
+	 */
+	private final Map<K, ResultAccessTracker<R>> registeredOperationTriggers = new ConcurrentHashMap<>();
+
+	/**
+	 * Caches the result of completed operations.
+	 */
+	private final Cache<K, ResultAccessTracker<R>> completedOperations;
+
+	CompletedOperationCache() {
+		this(Ticker.systemTicker());
+	}
+
+	@VisibleForTesting
+	CompletedOperationCache(final Ticker ticker) {
+		completedOperations = CacheBuilder.newBuilder()
+			.expireAfterWrite(COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS, TimeUnit.SECONDS)
+			.removalListener((RemovalListener<K, ResultAccessTracker<R>>) removalNotification -> {
+				if (removalNotification.wasEvicted()) {
+					Preconditions.checkState(removalNotification.getKey() != null);
+					Preconditions.checkState(removalNotification.getValue() != null);
+
+					// When shutting down the cache, we wait until all results are accessed.
+					// When a result gets evicted from the cache, it will not be possible to access
+					// it any longer, and we might be in the process of shutting down, so we mark
+					// the result as accessed to avoid waiting indefinitely.
+					removalNotification.getValue().markAccessed();
+
+					LOGGER.info("Evicted result with trigger id {} because its TTL of {}s has expired.",
+						removalNotification.getKey().getTriggerId(),
+						COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS);
+				}
+			})
+			.ticker(ticker)
+			.build();
+	}
+
+	/**
+	 * Registers an ongoing operation with the cache.
+	 *
+	 * @param operationResultFuture A future containing the operation result.
+	 */
+	public void registerOngoingOperation(
+			final K operationKey,
+			final CompletableFuture<R> operationResultFuture) {
+		final ResultAccessTracker<R> inProgress = ResultAccessTracker.inProgress();
+		registeredOperationTriggers.put(operationKey, inProgress);
+		operationResultFuture.whenComplete((result, error) -> {
+			if (error == null) {
+				completedOperations.put(operationKey, inProgress.finishOperation(Either.Right(result)));
+			} else {
+				completedOperations.put(operationKey, inProgress.finishOperation(Either.Left(error)));
+			}
+			registeredOperationTriggers.remove(operationKey);
+		});
+	}
+
+	/**
+	 * Returns the operation result or a {@code Throwable} if the {@code CompletableFuture}
+	 * finished, otherwise {@code null}.
+	 *
+	 * @throws UnknownOperationKeyException If the operation is not found, and there is no ongoing
+	 *                                      operation under the provided key.
+	 */
+	@Nullable
+	public Either<Throwable, R> get(
+			final K operationKey) throws UnknownOperationKeyException {
+		ResultAccessTracker<R> resultAccessTracker;
+		if ((resultAccessTracker = registeredOperationTriggers.get(operationKey)) == null
+			&& (resultAccessTracker = completedOperations.getIfPresent(operationKey)) == null) {
+			throw new UnknownOperationKeyException(operationKey);
+		}
+
+		return resultAccessTracker.accessOperationResultOrError();
+	}
+
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		return FutureUtils.orTimeout(
+			asyncWaitForResultsToBeAccessed(),
+			COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS,
+			TimeUnit.SECONDS);
+	}
+
+	private CompletableFuture<Void> asyncWaitForResultsToBeAccessed() {
+		return FutureUtils.waitForAll(
+			Stream.concat(registeredOperationTriggers.values().stream(), completedOperations.asMap().values().stream())
+				.map(ResultAccessTracker::getAccessedFuture)
+				.collect(Collectors.toList()));
+	}
+
+	@VisibleForTesting
+	void cleanUp() {
+		completedOperations.cleanUp();
+	}
+
+	/**
+	 * Stores the result of an asynchronous operation, and tracks accesses to it.
+	 */
+	private static class ResultAccessTracker<R> {
+
+		/** Result of an asynchronous operation. Null if operation is in progress. */
+		@Nullable
+		private final Either<Throwable, R> operationResultOrError;
+
+		/** Future that completes if a non-null {@link #operationResultOrError} is accessed. */
+		private final CompletableFuture<Void> accessed;
+
+		private static <R> ResultAccessTracker<R> inProgress() {
+			return new ResultAccessTracker<>();
+		}
+
+		private ResultAccessTracker() {
+			this.operationResultOrError = null;
+			this.accessed = new CompletableFuture<>();
+		}
+
+		private ResultAccessTracker(final Either<Throwable, R> operationResultOrError, final CompletableFuture<Void> accessed) {
+			this.operationResultOrError = checkNotNull(operationResultOrError);
+			this.accessed = checkNotNull(accessed);
+		}
+
+		/**
+		 * Creates a new instance of the tracker with the result of the asynchronous operation set.
+		 */
+		public ResultAccessTracker<R> finishOperation(final Either<Throwable, R> operationResultOrError) {
+			checkState(this.operationResultOrError == null);
+
+			return new ResultAccessTracker<>(checkNotNull(operationResultOrError), this.accessed);
+		}
+
+		/**
+		 * If present, returns the result of the asynchronous operation, and marks the result as
+		 * accessed. If the result is not present, this method returns null.
+		 */
+		@Nullable
+		public Either<Throwable, R> accessOperationResultOrError() {
+			if (operationResultOrError != null) {
+				markAccessed();
+			}
+			return operationResultOrError;
+		}
+
+		public CompletableFuture<Void> getAccessedFuture() {
+			return accessed;
+		}
+
+		private void markAccessed() {
+			accessed.complete(null);
+		}
+
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/UnknownOperationKeyException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/UnknownOperationKeyException.java
new file mode 100644
index 00000000000..962b80fe5e6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/async/UnknownOperationKeyException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception that indicates that there is no ongoing or completed savepoint for a given
+ * {@link JobID} and {@link TriggerId} pair.
+ */
+class UnknownOperationKeyException extends FlinkException {
+	private static final long serialVersionUID = 1L;
+
+	UnknownOperationKeyException(final Object operationKey) {
+		super("No ongoing operation for " + operationKey);
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
index 4bb473e9ceb..e773702274f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AsynchronousJobOperationKey.java
@@ -33,7 +33,7 @@
  * A pair of {@link JobID} and {@link TriggerId} used as a key to a hash based
  * collection.
  *
- * @see AbstractAsynchronousOperationHandlers.CompletedOperationCache
+ * @see AbstractAsynchronousOperationHandlers
  */
 @Immutable
 public class AsynchronousJobOperationKey extends OperationKey {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
index 4c7ac9406b1..8a20868ce37 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
@@ -24,7 +24,7 @@
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException;
-import org.apache.flink.runtime.rest.AbstractHandler;
+import org.apache.flink.runtime.rest.handler.AbstractHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
index b407ada46e6..b60afbb5948 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
@@ -45,6 +45,7 @@
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
 import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
@@ -69,7 +70,7 @@
 	 * @param headers additional header values
 	 * @param <P> type of the response
 	 */
-	public static <P extends ResponseBody> void sendResponse(
+	public static <P extends ResponseBody> CompletableFuture<Void> sendResponse(
 			ChannelHandlerContext channelHandlerContext,
 			HttpRequest httpRequest,
 			P response,
@@ -80,15 +81,14 @@
 			mapper.writeValue(sw, response);
 		} catch (IOException ioe) {
 			LOG.error("Internal server error. Could not map response to JSON.", ioe);
-			sendErrorResponse(
+			return sendErrorResponse(
 				channelHandlerContext,
 				httpRequest,
 				new ErrorResponseBody("Internal server error. Could not map response to JSON."),
 				HttpResponseStatus.INTERNAL_SERVER_ERROR,
 				headers);
-			return;
 		}
-		sendResponse(
+		return sendResponse(
 			channelHandlerContext,
 			httpRequest,
 			sw.toString(),
@@ -105,14 +105,14 @@
 	 * @param statusCode of the message to send
 	 * @param headers additional header values
 	 */
-	public static void sendErrorResponse(
+	public static CompletableFuture<Void> sendErrorResponse(
 			ChannelHandlerContext channelHandlerContext,
 			HttpRequest httpRequest,
 			ErrorResponseBody errorMessage,
 			HttpResponseStatus statusCode,
 			Map<String, String> headers) {
 
-		sendErrorResponse(
+		return sendErrorResponse(
 			channelHandlerContext,
 			HttpHeaders.isKeepAlive(httpRequest),
 			errorMessage,
@@ -129,7 +129,7 @@ public static void sendErrorResponse(
 	 * @param statusCode of the message to send
 	 * @param headers additional header values
 	 */
-	public static void sendErrorResponse(
+	public static CompletableFuture<Void> sendErrorResponse(
 			ChannelHandlerContext channelHandlerContext,
 			boolean keepAlive,
 			ErrorResponseBody errorMessage,
@@ -142,14 +142,14 @@ public static void sendErrorResponse(
 		} catch (IOException e) {
 			// this should never happen
 			LOG.error("Internal server error. Could not map error response to JSON.", e);
-			sendResponse(
+			return sendResponse(
 				channelHandlerContext,
 				keepAlive,
 				"Internal server error. Could not map error response to JSON.",
 				HttpResponseStatus.INTERNAL_SERVER_ERROR,
 				headers);
 		}
-		sendResponse(
+		return sendResponse(
 			channelHandlerContext,
 			keepAlive,
 			sw.toString(),
@@ -166,14 +166,14 @@ public static void sendErrorResponse(
 	 * @param statusCode of the message to send
 	 * @param headers additional header values
 	 */
-	public static void sendResponse(
+	public static CompletableFuture<Void> sendResponse(
 			@Nonnull ChannelHandlerContext channelHandlerContext,
 			@Nonnull HttpRequest httpRequest,
 			@Nonnull String message,
 			@Nonnull HttpResponseStatus statusCode,
 			@Nonnull Map<String, String> headers) {
 
-		sendResponse(
+		return sendResponse(
 			channelHandlerContext,
 			HttpHeaders.isKeepAlive(httpRequest),
 			message,
@@ -190,7 +190,7 @@ public static void sendResponse(
 	 * @param statusCode of the message to send
 	 * @param headers additional header values
 	 */
-	public static void sendResponse(
+	public static CompletableFuture<Void> sendResponse(
 			@Nonnull ChannelHandlerContext channelHandlerContext,
 			boolean keepAlive,
 			@Nonnull String message,
@@ -223,5 +223,19 @@ public static void sendResponse(
 		if (!keepAlive) {
 			lastContentFuture.addListener(ChannelFutureListener.CLOSE);
 		}
+
+		return toCompletableFuture(lastContentFuture);
+	}
+
+	private static CompletableFuture<Void> toCompletableFuture(final ChannelFuture channelFuture) {
+		final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+		channelFuture.addListener(future -> {
+			if (future.isSuccess()) {
+				completableFuture.complete(null);
+			} else {
+				completableFuture.completeExceptionally(future.cause());
+			}
+		});
+		return completableFuture;
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
index c289634bc07..cc116dd7714 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStoreTest.java
@@ -27,6 +27,7 @@
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.ManualTicker;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
@@ -281,20 +282,6 @@ private FileArchivedExecutionGraphStore createDefaultExecutionGraphStore(File st
 			Ticker.systemTicker());
 	}
 
-	private static final class ManualTicker extends Ticker {
-
-		private long currentTime = 0;
-
-		@Override
-		public long read() {
-			return currentTime;
-		}
-
-		void advanceTime(long duration, TimeUnit timeUnit) {
-			currentTime += timeUnit.toNanos(duration);
-		}
-	}
-
 	private static final class PartialArchivedExecutionGraphMatcher extends BaseMatcher<ArchivedExecutionGraph> {
 
 		private final ArchivedExecutionGraph archivedExecutionGraph;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
index 430bfad38fd..0c28745518e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestServerEndpointITCase.java
@@ -91,14 +91,18 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.hamcrest.CoreMatchers.hasItems;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -131,6 +135,8 @@
 	private SSLContext defaultSSLContext;
 	private SSLSocketFactory defaultSSLSocketFactory;
 
+	private TestHandler testHandler;
+
 	public RestServerEndpointITCase(final Configuration config) {
 		this.config = requireNonNull(config);
 	}
@@ -194,7 +200,7 @@ public void setup() throws Exception {
 		final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () ->
 			CompletableFuture.completedFuture(mockRestfulGateway);
 
-		TestHandler testHandler = new TestHandler(
+		testHandler = new TestHandler(
 			CompletableFuture.completedFuture(restAddress),
 			mockGatewayRetriever,
 			RpcUtils.INF_TIMEOUT);
@@ -253,7 +259,7 @@ public void teardown() throws Exception {
 		}
 
 		if (serverEndpoint != null) {
-			serverEndpoint.close();
+			serverEndpoint.closeAsync().get(timeout.getSize(), timeout.getUnit());
 			serverEndpoint = null;
 		}
 	}
@@ -264,37 +270,25 @@ public void teardown() throws Exception {
 	 */
 	@Test
 	public void testRequestInterleaving() throws Exception {
-
-		TestParameters parameters = new TestParameters();
-		parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
-		parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
+		final HandlerBlocker handlerBlocker = new HandlerBlocker(timeout);
+		testHandler.handlerBody = id -> {
+			if (id == 1) {
+				handlerBlocker.arriveAndBlock();
+			}
+			return CompletableFuture.completedFuture(new TestResponse(id));
+		};
 
 		// send first request and wait until the handler blocks
-		CompletableFuture<TestResponse> response1;
-
-		synchronized (TestHandler.LOCK) {
-			response1 = restClient.sendRequest(
-				serverAddress.getHostName(),
-				serverAddress.getPort(),
-				new TestHeaders(),
-				parameters,
-				new TestRequest(1));
-			TestHandler.LOCK.wait();
-		}
+		final CompletableFuture<TestResponse> response1 = sendRequestToTestHandler(new TestRequest(1));
+		handlerBlocker.awaitRequestToArrive();
 
 		// send second request and verify response
-		CompletableFuture<TestResponse> response2 = restClient.sendRequest(
-			serverAddress.getHostName(),
-			serverAddress.getPort(),
-			new TestHeaders(),
-			parameters,
-			new TestRequest(2));
+		final CompletableFuture<TestResponse> response2 = sendRequestToTestHandler(new TestRequest(2));
 		assertEquals(2, response2.get().id);
 
 		// wake up blocked handler
-		synchronized (TestHandler.LOCK) {
-			TestHandler.LOCK.notifyAll();
-		}
+		handlerBlocker.unblockRequest();
+
 		// verify response to first request
 		assertEquals(1, response1.get().id);
 	}
@@ -335,41 +329,34 @@ public void testBadHandlerRequest() throws Exception {
 	}
 
 	/**
-	 * Tests that requests and responses larger than {@link #TEST_REST_MAX_CONTENT_LENGTH}
-	 * are rejected by the server and client, respectively.
+	 * Tests that requests larger than {@link #TEST_REST_MAX_CONTENT_LENGTH} are rejected.
 	 */
 	@Test
-	public void testMaxContentLengthLimit() throws Exception {
-		final TestParameters parameters = new TestParameters();
-		parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
-		parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
-
-		CompletableFuture<TestResponse> response;
-		response = restClient.sendRequest(
-			serverAddress.getHostName(),
-			serverAddress.getPort(),
-			new TestHeaders(),
-			parameters,
-			new TestRequest(2, createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH)));
+	public void testShouldRespectMaxContentLengthLimitForRequests() throws Exception {
+		testHandler.handlerBody = id -> {
+			throw new AssertionError("Request should not arrive at server.");
+		};
 
 		try {
-			response.get();
+			sendRequestToTestHandler(new TestRequest(2, createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH))).get();
 			fail("Expected exception not thrown");
 		} catch (final ExecutionException e) {
 			final Throwable throwable = ExceptionUtils.stripExecutionException(e);
 			assertThat(throwable, instanceOf(RestClientException.class));
 			assertThat(throwable.getMessage(), containsString("Try to raise"));
 		}
+	}
 
-		response = restClient.sendRequest(
-			serverAddress.getHostName(),
-			serverAddress.getPort(),
-			new TestHeaders(),
-			parameters,
-			new TestRequest(TestHandler.LARGE_RESPONSE_BODY_ID));
+	/**
+	 * Tests that responses larger than {@link #TEST_REST_MAX_CONTENT_LENGTH} are rejected.
+	 */
+	@Test
+	public void testShouldRespectMaxContentLengthLimitForResponses() throws Exception {
+		testHandler.handlerBody = id -> CompletableFuture.completedFuture(
+			new TestResponse(id, createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH)));
 
 		try {
-			response.get();
+			sendRequestToTestHandler(new TestRequest(1)).get();
 			fail("Expected exception not thrown");
 		} catch (final ExecutionException e) {
 			final Throwable throwable = ExceptionUtils.stripExecutionException(e);
@@ -545,6 +532,43 @@ public void testNonSslRedirectForEnabledSsl() throws Exception {
 		}
 	}
 
+	/**
+	 * Tests that after calling {@link RestServerEndpoint#closeAsync()}, the handlers are closed
+	 * first, and we wait for in-flight requests to finish. As long as not all handlers are closed,
+	 * HTTP requests should be served.
+	 */
+	@Test
+	public void testShouldWaitForHandlersWhenClosing() throws Exception {
+		testHandler.closeFuture = new CompletableFuture<>();
+		final HandlerBlocker handlerBlocker = new HandlerBlocker(timeout);
+		testHandler.handlerBody = id -> {
+			// Intentionally schedule the work on a different thread. This is to simulate
+			// handlers where the CompletableFuture is finished by the RPC framework.
+			return CompletableFuture.supplyAsync(() -> {
+				handlerBlocker.arriveAndBlock();
+				return new TestResponse(id);
+			});
+		};
+
+		// Initiate closing RestServerEndpoint but the test handler should block.
+		final CompletableFuture<Void> closeRestServerEndpointFuture = serverEndpoint.closeAsync();
+		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
+
+		final CompletableFuture<TestResponse> request = sendRequestToTestHandler(new TestRequest(1));
+		handlerBlocker.awaitRequestToArrive();
+
+		// Allow handler to close but there is still one in-flight request which should prevent
+		// the RestServerEndpoint from closing.
+		testHandler.closeFuture.complete(null);
+		assertThat(closeRestServerEndpointFuture.isDone(), is(false));
+
+		// Finish the in-flight request.
+		handlerBlocker.unblockRequest();
+
+		request.get(timeout.getSize(), timeout.getUnit());
+		closeRestServerEndpointFuture.get(timeout.getSize(), timeout.getUnit());
+	}
+
 	private HttpURLConnection openHttpConnectionForUpload(final String boundary) throws IOException {
 		final HttpURLConnection connection =
 			(HttpURLConnection) new URL(serverEndpoint.getRestBaseUrl() + "/upload").openConnection();
@@ -587,9 +611,9 @@ protected void startInternal() {}
 
 	private static class TestHandler extends AbstractRestHandler<RestfulGateway, TestRequest, TestResponse, TestParameters> {
 
-		private static final Object LOCK = new Object();
+		private CompletableFuture<Void> closeFuture = CompletableFuture.completedFuture(null);
 
-		private static final int LARGE_RESPONSE_BODY_ID = 3;
+		private Function<Integer, CompletableFuture<TestResponse>> handlerBody;
 
 		TestHandler(
 				CompletableFuture<String> localAddressFuture,
@@ -604,25 +628,89 @@ protected void startInternal() {}
 		}
 
 		@Override
-		protected CompletableFuture<TestResponse> handleRequest(@Nonnull HandlerRequest<TestRequest, TestParameters> request, RestfulGateway gateway) throws RestHandlerException {
+		protected CompletableFuture<TestResponse> handleRequest(@Nonnull HandlerRequest<TestRequest, TestParameters> request, RestfulGateway gateway) {
 			assertEquals(request.getPathParameter(JobIDPathParameter.class), PATH_JOB_ID);
 			assertEquals(request.getQueryParameter(JobIDQueryParameter.class).get(0), QUERY_JOB_ID);
 
 			final int id = request.getRequestBody().id;
-			if (id == 1) {
-				synchronized (LOCK) {
-					try {
-						LOCK.notifyAll();
-						LOCK.wait();
-					} catch (InterruptedException ignored) {
-					}
-				}
-			} else if (id == LARGE_RESPONSE_BODY_ID) {
-				return CompletableFuture.completedFuture(new TestResponse(
-					id,
-					createStringOfSize(TEST_REST_MAX_CONTENT_LENGTH)));
+			return handlerBody.apply(id);
+		}
+
+		@Override
+		public CompletableFuture<Void> closeHandlerAsync() {
+			return closeFuture;
+		}
+	}
+
+	private CompletableFuture<TestResponse> sendRequestToTestHandler(final TestRequest testRequest) {
+		try {
+			return restClient.sendRequest(
+				serverAddress.getHostName(),
+				serverAddress.getPort(),
+				new TestHeaders(),
+				createTestParameters(),
+				testRequest);
+		} catch (final IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	private static TestParameters createTestParameters() {
+		final TestParameters parameters = new TestParameters();
+		parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
+		parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
+		return parameters;
+	}
+
+	/**
+	 * This is a helper class for tests that require to have fine-grained control over HTTP
+	 * requests so that they are not dispatched immediately.
+	 */
+	private static class HandlerBlocker {
+
+		private final Time timeout;
+
+		private final CountDownLatch requestArrivedLatch = new CountDownLatch(1);
+
+		private final CountDownLatch finishRequestLatch = new CountDownLatch(1);
+
+		private HandlerBlocker(final Time timeout) {
+			this.timeout = checkNotNull(timeout);
+		}
+
+		/**
+		 * Waits until {@link #arriveAndBlock()} is called.
+		 */
+		public void awaitRequestToArrive() {
+			try {
+				assertTrue(requestArrivedLatch.await(timeout.getSize(), timeout.getUnit()));
+			} catch (final InterruptedException e) {
+				Thread.currentThread().interrupt();
 			}
-			return CompletableFuture.completedFuture(new TestResponse(id));
+		}
+
+		/**
+		 * Signals that the request arrived. This method blocks until {@link #unblockRequest()} is
+		 * called.
+		 */
+		public void arriveAndBlock() {
+			markRequestArrived();
+			try {
+				assertTrue(finishRequestLatch.await(timeout.getSize(), timeout.getUnit()));
+			} catch (final InterruptedException e) {
+				Thread.currentThread().interrupt();
+			}
+		}
+
+		/**
+		 * @see #arriveAndBlock()
+		 */
+		public void unblockRequest() {
+			finishRequestLatch.countDown();
+		}
+
+		private void markRequestArrived() {
+			requestArrivedLatch.countDown();
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerTest.java
similarity index 96%
rename from flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
rename to flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerTest.java
index 607c1c4bb06..ebb6656fdcd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/AbstractHandlerTest.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.rest;
+package org.apache.flink.runtime.rest.handler;
 
-import org.apache.flink.runtime.rest.handler.FileUploads;
-import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.handler.router.RouteResult;
 import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/InFlightRequestTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/InFlightRequestTrackerTest.java
new file mode 100644
index 00000000000..c486571d9cf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/InFlightRequestTrackerTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link InFlightRequestTracker}.
+ */
+public class InFlightRequestTrackerTest {
+
+	private InFlightRequestTracker inFlightRequestTracker;
+
+	@Before
+	public void setUp() {
+		inFlightRequestTracker = new InFlightRequestTracker();
+	}
+
+	@Test
+	public void testShouldFinishAwaitAsyncImmediatelyIfNoRequests() {
+		assertTrue(inFlightRequestTracker.awaitAsync().isDone());
+	}
+
+	@Test
+	public void testShouldFinishAwaitAsyncIffAllRequestsDeregistered() {
+		inFlightRequestTracker.registerRequest();
+
+		final CompletableFuture<Void> awaitFuture = inFlightRequestTracker.awaitAsync();
+		assertFalse(awaitFuture.isDone());
+
+		inFlightRequestTracker.deregisterRequest();
+		assertTrue(awaitFuture.isDone());
+	}
+
+	@Test
+	public void testAwaitAsyncIsIdempotent() {
+		final CompletableFuture<Void> awaitFuture = inFlightRequestTracker.awaitAsync();
+		assertTrue(awaitFuture.isDone());
+
+		assertSame(
+			"The reference to the future must not change",
+			awaitFuture,
+			inFlightRequestTracker.awaitAsync());
+	}
+
+	@Test
+	public void testShouldTolerateRegisterAfterAwaitAsync() {
+		final CompletableFuture<Void> awaitFuture = inFlightRequestTracker.awaitAsync();
+		assertTrue(awaitFuture.isDone());
+
+		inFlightRequestTracker.registerRequest();
+
+		assertSame(
+			"The reference to the future must not change",
+			awaitFuture,
+			inFlightRequestTracker.awaitAsync());
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
index eeb41a829b8..fb384a938d1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/AbstractAsynchronousOperationHandlersTest.java
@@ -179,6 +179,32 @@ public void testUnknownTriggerId() throws Exception {
 		}
 	}
 
+	/**
+	 * Tests that the future returned by {@link AbstractAsynchronousOperationHandlers.StatusHandler#closeAsync()}
+	 * completes when the result of the asynchronous operation is served.
+	 */
+	@Test
+	public void testCloseShouldFinishOnFirstServedResult() throws Exception {
+		final CompletableFuture<String> savepointFuture = new CompletableFuture<>();
+		final TestingRestfulGateway testingRestfulGateway = new TestingRestfulGateway.Builder()
+			.setTriggerSavepointFunction((JobID jobId, String directory) -> savepointFuture)
+			.build();
+
+		final TriggerId triggerId = testingTriggerHandler.handleRequest(
+			triggerOperationRequest(),
+			testingRestfulGateway).get().getTriggerId();
+		final CompletableFuture<Void> closeFuture = testingStatusHandler.closeAsync();
+
+		testingStatusHandler.handleRequest(statusOperationRequest(triggerId), testingRestfulGateway).get();
+
+		assertThat(closeFuture.isDone(), is(false));
+
+		savepointFuture.complete("foobar");
+		testingStatusHandler.handleRequest(statusOperationRequest(triggerId), testingRestfulGateway).get();
+
+		assertThat(closeFuture.isDone(), is(true));
+	}
+
 	private static HandlerRequest<EmptyRequestBody, EmptyMessageParameters> triggerOperationRequest() throws HandlerRequestException {
 		return new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance());
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCacheTest.java
new file mode 100644
index 00000000000..2d13780f956
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/async/CompletedOperationCacheTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.async;
+
+import org.apache.flink.runtime.rest.messages.TriggerId;
+import org.apache.flink.runtime.util.ManualTicker;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link CompletedOperationCache}.
+ */
+public class CompletedOperationCacheTest extends TestLogger {
+
+	private static final OperationKey TEST_OPERATION_KEY = new OperationKey(new TriggerId());
+
+	private static final CompletableFuture<String> TEST_OPERATION_RESULT = CompletableFuture.completedFuture("foo");
+
+	private ManualTicker manualTicker;
+
+	private CompletedOperationCache<OperationKey, String> completedOperationCache;
+
+	@Before
+	public void setUp() {
+		manualTicker = new ManualTicker();
+		completedOperationCache = new CompletedOperationCache<>(manualTicker);
+	}
+
+	@Test
+	public void testShouldFinishClosingCacheIfAllResultsAreEvicted() {
+		completedOperationCache.registerOngoingOperation(TEST_OPERATION_KEY, TEST_OPERATION_RESULT);
+		final CompletableFuture<Void> closeCacheFuture = completedOperationCache.closeAsync();
+		assertThat(closeCacheFuture.isDone(), is(false));
+
+		manualTicker.advanceTime(300, TimeUnit.SECONDS);
+		completedOperationCache.cleanUp();
+
+		assertThat(closeCacheFuture.isDone(), is(true));
+	}
+
+	@Test
+	public void testShouldFinishClosingCacheIfAllResultsAccessed() throws Exception {
+		completedOperationCache.registerOngoingOperation(TEST_OPERATION_KEY, TEST_OPERATION_RESULT);
+		final CompletableFuture<Void> closeCacheFuture = completedOperationCache.closeAsync();
+		assertThat(closeCacheFuture.isDone(), is(false));
+
+		final Either<Throwable, String> operationResultOrError = completedOperationCache.get(TEST_OPERATION_KEY);
+
+		assertThat(operationResultOrError, is(notNullValue()));
+		assertThat(operationResultOrError.right(), is(equalTo(TEST_OPERATION_RESULT.get())));
+		assertThat(closeCacheFuture.isDone(), is(true));
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ManualTicker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ManualTicker.java
new file mode 100644
index 00000000000..d2e2e1d7bea
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ManualTicker.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Controllable {@link Ticker} implementation for tests.
+ */
+public final class ManualTicker extends Ticker {
+
+	private long currentTime;
+
+	@Override
+	public long read() {
+		return currentTime;
+	}
+
+	public void advanceTime(final long duration, final TimeUnit timeUnit) {
+		currentTime += timeUnit.toNanos(duration);
+	}
+}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index afca8f12100..c5ec1b5f914 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -33,6 +33,7 @@
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.dispatcher.DispatcherId;
@@ -249,7 +250,7 @@ public Long map(Long value) throws Exception {
 				clusterClient.shutdown();
 			}
 			if (dispatcherResourceManagerComponent != null) {
-				dispatcherResourceManagerComponent.close();
+				dispatcherResourceManagerComponent.deregisterApplicationAndClose(ApplicationStatus.SUCCEEDED, null);
 			}
 
 			fatalErrorHandler.rethrowError();
diff --git a/flink-yarn-tests/src/test/resources/log4j-test.properties b/flink-yarn-tests/src/test/resources/log4j-test.properties
index 8f56c1fa9a5..42ae7ddf08e 100644
--- a/flink-yarn-tests/src/test/resources/log4j-test.properties
+++ b/flink-yarn-tests/src/test/resources/log4j-test.properties
@@ -35,7 +35,7 @@ log4j.logger.org.apache.flink.runtime.leaderelection=INFO
 log4j.logger.org.apache.flink.runtime.leaderretrieval=INFO
 
 log4j.logger.org.apache.directory=OFF
-log4j.logger.org.mortbay.log=OFF, testlogger
+log4j.logger.org.mortbay.log=OFF
 log4j.logger.net.sf.ehcache=OFF
 log4j.logger.org.apache.hadoop.metrics2=OFF
 log4j.logger.org.apache.hadoop.conf.Configuration=OFF


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services