You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/26 09:25:51 UTC

[1/2] flink git commit: [FLINK-7664] Replace FlinkFutureException by java.util.concurrent.CompletionException

Repository: flink
Updated Branches:
  refs/heads/master e9decac62 -> d3458ec7b


http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
index 7315f65..1c286fa 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.streaming.api.operators.async.OperatorActions;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -37,6 +36,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -180,7 +180,7 @@ public class StreamElementQueueTest extends TestLogger {
 				try {
 					queue.put(streamRecordQueueEntry2);
 				} catch (InterruptedException e) {
-					throw new FlinkFutureException(e);
+					throw new CompletionException(e);
 				}
 			},
 			executor);
@@ -220,7 +220,7 @@ public class StreamElementQueueTest extends TestLogger {
 				try {
 					return queue.peekBlockingly();
 				} catch (InterruptedException e) {
-					throw new FlinkFutureException(e);
+					throw new CompletionException(e);
 				}
 			},
 			executor);
@@ -244,7 +244,7 @@ public class StreamElementQueueTest extends TestLogger {
 				try {
 					return queue.poll();
 				} catch (InterruptedException e) {
-					throw new FlinkFutureException(e);
+					throw new CompletionException(e);
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
index acc6b8e..73bdcc5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.streaming.api.operators.async.OperatorActions;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -35,6 +34,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -104,7 +104,7 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
 				try {
 					return queue.poll();
 				} catch (InterruptedException e) {
-					throw new FlinkFutureException(e);
+					throw new CompletionException(e);
 				}
 			},
 			executor);
@@ -125,7 +125,7 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
 				try {
 					return queue.poll();
 				} catch (InterruptedException e) {
-					throw new FlinkFutureException(e);
+					throw new CompletionException(e);
 				}
 			},
 			executor);
@@ -171,7 +171,7 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
 				try {
 					return queue.poll();
 				} catch (InterruptedException e) {
-					throw new FlinkFutureException(e);
+					throw new CompletionException(e);
 				}
 			},
 			executor);


[2/2] flink git commit: [FLINK-7664] Replace FlinkFutureException by java.util.concurrent.CompletionException

Posted by tr...@apache.org.
[FLINK-7664] Replace FlinkFutureException by java.util.concurrent.CompletionException

FlinkFutureException was introduced to fail a CompletableFuture callback. However, there
was already such a class which allows to better handle failures in different stages which
is the java.util.CompletionException. Therefore we replace FlinkFutureException by
CompletionException and remove the former.

This closes #4701.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3458ec7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3458ec7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3458ec7

Branch: refs/heads/master
Commit: d3458ec7ba0a8e61d07dc5330bda8a1f81de81fe
Parents: e9decac
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Sep 21 19:21:07 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Sep 26 11:25:42 2017 +0200

----------------------------------------------------------------------
 .../webmonitor/handlers/JarDeleteHandler.java   |  5 ++-
 .../webmonitor/handlers/JarListHandler.java     |  5 ++-
 .../webmonitor/handlers/JarPlanHandler.java     |  4 +-
 .../webmonitor/handlers/JarRunHandler.java      |  5 ++-
 .../runtime/akka/AkkaJobManagerGateway.java     | 19 ++++----
 .../concurrent/FlinkFutureException.java        | 47 --------------------
 .../runtime/jobmaster/JobManagerRunner.java     |  5 ++-
 .../resourcemanager/ResourceManagerRunner.java  |  6 ++-
 .../AbstractExecutionGraphRequestHandler.java   |  4 +-
 .../handler/legacy/ClusterOverviewHandler.java  |  4 +-
 .../handler/legacy/CurrentJobIdsHandler.java    |  5 ++-
 .../legacy/CurrentJobsOverviewHandler.java      |  5 ++-
 .../handler/legacy/JobAccumulatorsHandler.java  |  5 ++-
 .../handler/legacy/JobCancellationHandler.java  |  5 ++-
 .../JobCancellationWithSavepointHandlers.java   | 11 ++---
 .../rest/handler/legacy/JobConfigHandler.java   |  5 ++-
 .../rest/handler/legacy/JobDetailsHandler.java  |  5 ++-
 .../handler/legacy/JobExceptionsHandler.java    |  5 ++-
 .../handler/legacy/JobManagerConfigHandler.java |  5 ++-
 .../rest/handler/legacy/JobStoppingHandler.java |  5 ++-
 .../legacy/JobVertexAccumulatorsHandler.java    |  5 ++-
 .../handler/legacy/JobVertexDetailsHandler.java |  5 ++-
 .../legacy/JobVertexTaskManagersHandler.java    |  5 ++-
 ...taskExecutionAttemptAccumulatorsHandler.java |  5 ++-
 .../SubtaskExecutionAttemptDetailsHandler.java  |  5 ++-
 .../legacy/SubtasksAllAccumulatorsHandler.java  |  5 ++-
 .../handler/legacy/SubtasksTimesHandler.java    |  5 ++-
 .../handler/legacy/TaskManagerLogHandler.java   | 11 ++---
 .../handler/legacy/TaskManagersHandler.java     |  7 +--
 .../checkpoints/CheckpointConfigHandler.java    |  5 ++-
 .../CheckpointStatsDetailsHandler.java          |  5 ++-
 .../checkpoints/CheckpointStatsHandler.java     |  5 ++-
 .../legacy/metrics/AbstractMetricsHandler.java  |  5 ++-
 .../runtime/blob/BlobServerDeleteTest.java      |  6 ++-
 .../flink/runtime/blob/BlobServerGetTest.java   | 17 ++++---
 .../flink/runtime/blob/BlobServerPutTest.java   |  6 ++-
 .../runtime/concurrent/FutureUtilsTest.java     |  5 ++-
 .../slotmanager/SlotManagerTest.java            |  5 ++-
 .../runtime/rpc/FencedRpcEndpointTest.java      |  5 ++-
 .../queue/OrderedStreamElementQueueTest.java    |  4 +-
 .../async/queue/StreamElementQueueTest.java     |  8 ++--
 .../queue/UnorderedStreamElementQueueTest.java  |  8 ++--
 42 files changed, 146 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index 04f663d..cf49b07 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -30,6 +30,7 @@ import java.io.FilenameFilter;
 import java.io.StringWriter;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -80,7 +81,7 @@ public class JarDeleteHandler extends AbstractJsonRequestHandler {
 					return writer.toString();
 				}
 				catch (Exception e) {
-					throw new FlinkFutureException("Failed to delete jar id " + pathParams.get("jarid") + '.', e);
+					throw new CompletionException(new FlinkException("Failed to delete jar id " + pathParams.get("jarid") + '.', e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index 4248dd4..746870b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -19,11 +19,11 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
 import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.jar.JarFile;
 import java.util.jar.Manifest;
@@ -140,7 +141,7 @@ public class JarListHandler extends AbstractJsonRequestHandler {
 					return writer.toString();
 				}
 				catch (Exception e) {
-					throw new FlinkFutureException("Failed to fetch jar list.", e);
+					throw new CompletionException(new FlinkException("Failed to fetch jar list.", e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index 4d79492..aa7c056 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -30,6 +29,7 @@ import java.io.File;
 import java.io.StringWriter;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -65,7 +65,7 @@ public class JarPlanHandler extends JarActionHandler {
 					return writer.toString();
 				}
 				catch (Exception e) {
-					throw new FlinkFutureException(e);
+					throw new CompletionException(e);
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 16a1565..debdfa1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -24,10 +24,10 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import com.fasterxml.jackson.core.JsonGenerator;
@@ -36,6 +36,7 @@ import java.io.File;
 import java.io.StringWriter;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -86,7 +87,7 @@ public class JarRunHandler extends JarActionHandler {
 					gen.close();
 					return writer.toString();
 				} catch (Exception e) {
-					throw new FlinkFutureException("Could not run the jar.", e);
+					throw new CompletionException(new FlinkException("Could not run the jar.", e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 23ba6b3..18025dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.akka;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -36,12 +35,14 @@ import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 
 import scala.Option;
 import scala.reflect.ClassTag$;
@@ -101,15 +102,15 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 						if (Objects.equals(success.jobId(), jobGraph.getJobID())) {
 							return Acknowledge.get();
 						} else {
-							throw new FlinkFutureException("JobManager responded for wrong Job. This Job: " +
-								jobGraph.getJobID() + ", response: " + success.jobId());
+							throw new CompletionException(new FlinkException("JobManager responded for wrong Job. This Job: " +
+								jobGraph.getJobID() + ", response: " + success.jobId()));
 						}
 					} else if (response instanceof JobManagerMessages.JobResultFailure) {
 						JobManagerMessages.JobResultFailure failure = ((JobManagerMessages.JobResultFailure) response);
 
-						throw new FlinkFutureException("Job submission failed.", failure.cause());
+						throw new CompletionException(new FlinkException("Job submission failed.", failure.cause()));
 					} else {
-						throw new FlinkFutureException("Unknown response to SubmitJob message: " + response + '.');
+						throw new CompletionException(new FlinkException("Unknown response to SubmitJob message: " + response + '.'));
 					}
 				}
 			);
@@ -127,7 +128,7 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 				if (response instanceof JobManagerMessages.CancellationSuccess) {
 					return ((JobManagerMessages.CancellationSuccess) response).savepointPath();
 				} else {
-					throw new FlinkFutureException("Cancel with savepoint failed.", ((JobManagerMessages.CancellationFailure) response).cause());
+					throw new CompletionException(new FlinkException("Cancel with savepoint failed.", ((JobManagerMessages.CancellationFailure) response).cause()));
 				}
 			});
 	}
@@ -144,7 +145,7 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 				if (response instanceof JobManagerMessages.CancellationSuccess) {
 					return Acknowledge.get();
 				} else {
-					throw new FlinkFutureException("Cancel job failed " + jobId + '.', ((JobManagerMessages.CancellationFailure) response).cause());
+					throw new CompletionException(new FlinkException("Cancel job failed " + jobId + '.', ((JobManagerMessages.CancellationFailure) response).cause()));
 				}
 			});
 	}
@@ -161,7 +162,7 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 				if (response instanceof JobManagerMessages.StoppingSuccess) {
 					return Acknowledge.get();
 				} else {
-					throw new FlinkFutureException("Stop job failed " + jobId + '.', ((JobManagerMessages.StoppingFailure) response).cause());
+					throw new CompletionException(new FlinkException("Stop job failed " + jobId + '.', ((JobManagerMessages.StoppingFailure) response).cause()));
 				}
 			});
 	}
@@ -211,7 +212,7 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 					} else if (response instanceof JobManagerMessages.JobNotFound) {
 						return Optional.empty();
 					} else {
-						throw new FlinkFutureException("Unknown response: " + response + '.');
+						throw new CompletionException(new FlinkException("Unknown response: " + response + '.'));
 					}
 				});
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java
deleted file mode 100644
index c728904..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.concurrent;
-
-import org.apache.flink.util.FlinkRuntimeException;
-
-import java.util.concurrent.CompletionStage;
-
-/**
- * Base class for exceptions which are thrown in {@link CompletionStage}.
- *
- * <p>The exception has to extend {@link FlinkRuntimeException} because only
- * unchecked exceptions can be thrown in a future's stage. Additionally we let
- * it extend the Flink runtime exception because it designates the exception to
- * come from a Flink stage.
- */
-public class FlinkFutureException extends FlinkRuntimeException {
-	private static final long serialVersionUID = -8878194471694178210L;
-
-	public FlinkFutureException(String message) {
-		super(message);
-	}
-
-	public FlinkFutureException(Throwable cause) {
-		super(cause);
-	}
-
-	public FlinkFutureException(String message, Throwable cause) {
-		super(message, cause);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 0bf0cc2..70abf2f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -40,6 +39,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -223,7 +224,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 						}
 
 						if (exception != null) {
-							throw new FlinkFutureException("Could not properly shut down the JobManagerRunner.", exception);
+							throw new CompletionException(new FlinkException("Could not properly shut down the JobManagerRunner.", exception));
 						}
 					});
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index ed6e18c..caa3ba0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -20,17 +20,19 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 
 /**
  * Simple {@link StandaloneResourceManager} runner. It instantiates the resource manager's services
@@ -107,7 +109,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 						try {
 							resourceManagerRuntimeServices.shutDown();
 						} catch (Exception e) {
-							throw new FlinkFutureException("Could not properly shut down the resource manager runtime services.", e);
+							throw new CompletionException(new FlinkException("Could not properly shut down the resource manager runtime services.", e));
 						}
 					});
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
index e214a36..88932b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -30,6 +29,7 @@ import org.apache.flink.util.Preconditions;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -70,7 +70,7 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR
 				if (optGraph.isPresent()) {
 					return handleRequest(optGraph.get(), pathParams);
 				} else {
-					throw new FlinkFutureException(new NotFoundException("Could not find job with jobId " + jid + '.'));
+					throw new CompletionException(new NotFoundException("Could not find job with jobId " + jid + '.'));
 				}
 			}, executor);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
index 794ff20..2f883ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -39,6 +38,7 @@ import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders.CLUSTER_OVERVIEW_REST_PATH;
@@ -96,7 +96,7 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler implement
 							gen.close();
 							return writer.toString();
 						} catch (IOException exception) {
-							throw new FlinkFutureException("Could not write cluster overview.", exception);
+							throw new CompletionException(new FlinkException("Could not write cluster overview.", exception));
 						}
 					},
 					executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
index 07d9707..b806f86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
@@ -20,15 +20,16 @@ package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import java.io.StringWriter;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
@@ -104,7 +105,7 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
 					}
 				}
 				catch (Exception e) {
-					throw new FlinkFutureException("Failed to fetch list of all running jobs.", e);
+					throw new CompletionException(new FlinkException("Failed to fetch list of all running jobs.", e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
index b1939e5..669ef32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -33,6 +32,7 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -42,6 +42,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -129,7 +130,7 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler imple
 						gen.close();
 						return writer.toString();
 					} catch (IOException e) {
-						throw new FlinkFutureException("Could not write current jobs overview json.", e);
+						throw new CompletionException(new FlinkException("Could not write current jobs overview json.", e));
 					}
 				},
 				executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java
index 0a3b050..68810eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java
@@ -19,10 +19,10 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -32,6 +32,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -57,7 +58,7 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler
 				try {
 					return createJobAccumulatorsJson(graph);
 				} catch (IOException e) {
-					throw new FlinkFutureException("Could not create job accumulators json.", e);
+					throw new CompletionException(new FlinkException("Could not create job accumulators json.", e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
index a194f30..9e9849f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
@@ -20,13 +20,14 @@ package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -64,7 +65,7 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler {
 					}
 				}
 				catch (Exception e) {
-					throw new FlinkFutureException("Failed to cancel the job with id: "  + pathParams.get("jobid"), e);
+					throw new CompletionException(new FlinkException("Failed to cancel the job with id: "  + pathParams.get("jobid"), e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
index 23e94f5..fe71a52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
@@ -24,12 +24,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
 import org.apache.flink.runtime.rest.NotFoundException;
+import org.apache.flink.util.FlinkException;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -50,6 +50,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -154,12 +155,12 @@ public class JobCancellationWithSavepointHandlers {
 				return graphFuture.thenApplyAsync(
 					(Optional<AccessExecutionGraph> optGraph) -> {
 						final AccessExecutionGraph graph = optGraph.orElseThrow(
-							() -> new FlinkFutureException(
+							() -> new CompletionException(
 								new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')));
 
 						CheckpointCoordinator coord = graph.getCheckpointCoordinator();
 						if (coord == null) {
-							throw new FlinkFutureException(new Exception("Cannot find CheckpointCoordinator for job."));
+							throw new CompletionException(new FlinkException("Cannot find CheckpointCoordinator for job."));
 						}
 
 						String targetDirectory = pathParams.get("targetDirectory");
@@ -177,7 +178,7 @@ public class JobCancellationWithSavepointHandlers {
 						try {
 							return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout());
 						} catch (IOException e) {
-							throw new FlinkFutureException("Could not cancel job with savepoint.", e);
+							throw new CompletionException(new FlinkException("Could not cancel job with savepoint.", e));
 						}
 					}, executor);
 			} else {
@@ -342,7 +343,7 @@ public class JobCancellationWithSavepointHandlers {
 							}
 						}
 					} catch (Exception e) {
-						throw new FlinkFutureException("Could not handle in progress request.", e);
+						throw new CompletionException(new FlinkException("Could not handle in progress request.", e));
 					}
 				});
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
index bb1cf8f..787217f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
@@ -19,10 +19,10 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.ArchivedExecutionConfig;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -32,6 +32,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -57,7 +58,7 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
 				try {
 					return createJobConfigJson(graph);
 				} catch (IOException e) {
-					throw new FlinkFutureException("Could not write job config json.", e);
+					throw new CompletionException(new FlinkException("Could not write job config json.", e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java
index dd6aee8..b9f812b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rest.handler.legacy;
 
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -29,6 +28,7 @@ import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -40,6 +40,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -76,7 +77,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 				try {
 					return createJobDetailsJson(graph, fetcher);
 				} catch (IOException e) {
-					throw new FlinkFutureException("Could not create job details json.", e);
+					throw new CompletionException(new FlinkException("Could not create job details json.", e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
index 3236062..566631e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rest.handler.legacy;
 
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -27,6 +26,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -36,6 +36,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -63,7 +64,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 				try {
 					return createJobExceptionsJson(graph);
 				} catch (IOException e) {
-					throw new FlinkFutureException("Could not create job exceptions json.", e);
+					throw new CompletionException(new FlinkException("Could not create job exceptions json.", e));
 				}
 			},
 			executor

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java
index 364af91..d638467 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -79,7 +80,7 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
 					gen.close();
 					return writer.toString();
 				} catch (IOException e) {
-					throw new FlinkFutureException("Could not write configuration.", e);
+					throw new CompletionException(new FlinkException("Could not write configuration.", e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandler.java
index cc41a1c..9eefcd7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandler.java
@@ -20,13 +20,14 @@ package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -64,7 +65,7 @@ public class JobStoppingHandler extends AbstractJsonRequestHandler {
 					}
 				}
 				catch (Exception e) {
-					throw new FlinkFutureException("Failed to stop the job with id: "  + pathParams.get("jobid") + '.', e);
+					throw new CompletionException(new FlinkException("Failed to stop the job with id: "  + pathParams.get("jobid") + '.', e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java
index 9830ab4..d448027 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java
@@ -19,11 +19,11 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -34,6 +34,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -59,7 +60,7 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
 				try {
 					return createVertexAccumulatorsJson(jobVertex);
 				} catch (IOException e) {
-					throw new FlinkFutureException("Could not create job vertex accumulators json.", e);
+					throw new CompletionException(new FlinkException("Could not create job vertex accumulators json.", e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java
index 3f0c77c..bfa7020 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rest.handler.legacy;
 
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -28,6 +27,7 @@ import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -40,6 +40,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -69,7 +70,7 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
 				try {
 					return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher);
 				} catch (IOException e) {
-					throw new FlinkFutureException("Could not write the vertex details json.", e);
+					throw new CompletionException(new FlinkException("Could not write the vertex details json.", e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java
index fa4ab67..985ea1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rest.handler.legacy;
 
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -29,6 +28,7 @@ import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -42,6 +42,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -71,7 +72,7 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 				try {
 					return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher);
 				} catch (IOException e) {
-					throw new FlinkFutureException("Could not create TaskManager json.", e);
+					throw new CompletionException(new FlinkException("Could not create TaskManager json.", e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java
index be4fe0b..1570896 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -19,13 +19,13 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -36,6 +36,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -62,7 +63,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
 				try {
 					return createAttemptAccumulatorsJson(execAttempt);
 				} catch (IOException e) {
-					throw new FlinkFutureException("Could not create accumulator json.", e);
+					throw new CompletionException(new FlinkException("Could not create accumulator json.", e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
index 83a8793..b0b22ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rest.handler.legacy;
 
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -29,6 +28,7 @@ import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -41,6 +41,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.runtime.rest.handler.legacy.SubtaskCurrentAttemptDetailsHandler.SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH;
@@ -71,7 +72,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 				try {
 					return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher);
 				} catch (IOException e) {
-					throw new FlinkFutureException("Could not create attempt details json.", e);
+					throw new CompletionException(new FlinkException("Could not create attempt details json.", e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java
index 6d0757d..10d9e02 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java
@@ -19,13 +19,13 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -36,6 +36,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -61,7 +62,7 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
 				try {
 					return createSubtasksAccumulatorsJson(jobVertex);
 				} catch (IOException e) {
-					throw new FlinkFutureException("Could not create subtasks accumulator json.", e);
+					throw new CompletionException(new FlinkException("Could not create subtasks accumulator json.", e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java
index 13fdc16..bf1d87e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rest.handler.legacy;
 
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -26,6 +25,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -36,6 +36,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -62,7 +63,7 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
 				try {
 					return createSubtaskTimesJson(jobVertex);
 				} catch (IOException e) {
-					throw new FlinkFutureException("Could not write subtask time json.", e);
+					throw new CompletionException(new FlinkException("Could not write subtask time json.", e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
index 718657e..e84491c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
@@ -32,13 +32,13 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.blob.BlobView;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.rest.handler.RedirectHandler;
 import org.apache.flink.runtime.rest.handler.WebHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
@@ -74,6 +74,7 @@ import java.util.HashMap;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
@@ -160,7 +161,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
 					try {
 						return new BlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), port), config, blobView);
 					} catch (IOException e) {
-						throw new FlinkFutureException("Could not create BlobCache.", e);
+						throw new CompletionException(new FlinkException("Could not create BlobCache.", e));
 					}
 				},
 				executor);
@@ -178,7 +179,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
 				CompletableFuture<BlobKey> blobKeyFuture = taskManagerFuture.thenCompose(
 					(Optional<Instance> optTMInstance) -> {
 						Instance taskManagerInstance = optTMInstance.orElseThrow(
-							() -> new FlinkFutureException("Could not find instance with " + instanceID + '.'));
+							() -> new CompletionException(new FlinkException("Could not find instance with " + instanceID + '.')));
 						switch (fileMode) {
 							case LOG:
 								return taskManagerInstance.getTaskManagerGateway().requestTaskManagerLog(timeout);
@@ -200,7 +201,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
 									try {
 										blobCache.deleteGlobal(lastSubmittedFile.get(taskManagerID));
 									} catch (IOException e) {
-										throw new FlinkFutureException("Could not delete file for " + taskManagerID + '.', e);
+										throw new CompletionException(new FlinkException("Could not delete file for " + taskManagerID + '.', e));
 									}
 									lastSubmittedFile.put(taskManagerID, blobKey);
 								}
@@ -210,7 +211,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
 							try {
 								return blobCache.getFile(blobKey).getAbsolutePath();
 							} catch (IOException e) {
-								throw new FlinkFutureException("Could not retrieve blob for " + blobKey + '.', e);
+								throw new CompletionException(new FlinkException("Could not retrieve blob for " + blobKey + '.', e));
 							}
 						},
 						executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
index 95d417a..0880d0c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
@@ -19,13 +19,13 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.StringUtils;
 
 import com.fasterxml.jackson.core.JsonGenerator;
@@ -37,6 +37,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 import static java.util.Objects.requireNonNull;
@@ -83,7 +84,7 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 								optTaskManager.map(Collections::singleton).orElse(Collections.emptySet()),
 								pathParams);
 						} catch (IOException e) {
-							throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
+							throw new CompletionException(new FlinkException("Could not write TaskManagers JSON.", e));
 						}
 					},
 					executor);
@@ -95,7 +96,7 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 						try {
 							return writeTaskManagersJson(taskManagers, pathParams);
 						} catch (IOException e) {
-							throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
+							throw new CompletionException(new FlinkException("Could not write TaskManagers JSON.", e));
 						}
 					},
 					executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
index 2086628..27a64ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
 
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
@@ -27,6 +26,7 @@ import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -36,6 +36,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -61,7 +62,7 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
 				try {
 					return createCheckpointConfigJson(graph);
 				} catch (IOException e) {
-					throw new FlinkFutureException("Could not create checkpoint config json.", e);
+					throw new CompletionException(new FlinkException("Could not create checkpoint config json.", e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
index 61ebeda..b61c5d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
@@ -24,13 +24,13 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -42,6 +42,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -92,7 +93,7 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
 				try {
 					return createCheckpointDetailsJson(checkpoint);
 				} catch (IOException e) {
-					throw new FlinkFutureException("Could not create checkpoint details json.", e);
+					throw new CompletionException(new FlinkException("Could not create checkpoint details json.", e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
index abb353e..bea94f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
@@ -27,13 +27,13 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
@@ -45,6 +45,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -70,7 +71,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 				try {
 					return createCheckpointStatsJson(graph);
 				} catch (IOException e) {
-					throw new FlinkFutureException("Could not create checkpoint stats json.", e);
+					throw new CompletionException(new FlinkException("Could not create checkpoint stats json.", e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
index 315bdc2..6cf83c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import com.fasterxml.jackson.core.JsonGenerator;
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 /**
@@ -62,7 +63,7 @@ public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler
 						? getMetricsValues(pathParams, requestedMetricsList)
 						: getAvailableMetricsList(pathParams);
 				} catch (IOException e) {
-					throw new FlinkFutureException("Could not retrieve metrics.", e);
+					throw new CompletionException(new FlinkException("Could not retrieve metrics.", e));
 				}
 			},
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index 6bb5ab5..aad8816 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -21,10 +21,11 @@ package org.apache.flink.runtime.blob;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -37,6 +38,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -397,7 +399,7 @@ public class BlobServerDeleteTest extends TestLogger {
 						try (BlobClient blobClient = blobServer.createClient()) {
 							deleteHelper(blobClient, jobId, blobKey);
 						} catch (IOException e) {
-							throw new FlinkFutureException("Could not delete the given blob key " + blobKey + '.', e);
+							throw new CompletionException(new FlinkException("Could not delete the given blob key " + blobKey + '.', e));
 						}
 
 						return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 7ccf075..d3d3484 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -18,14 +18,15 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -36,7 +37,6 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
-
 import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.security.MessageDigest;
@@ -45,12 +45,17 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
@@ -303,7 +308,7 @@ public class BlobServerGetTest extends TestLogger {
 
 							return new ByteArrayInputStream(buffer);
 						} catch (IOException e) {
-							throw new FlinkFutureException("Could not read blob for key " + blobKey + '.', e);
+							throw new CompletionException(new FlinkException("Could not read blob for key " + blobKey + '.', e));
 						}
 					},
 					executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index 2b8e2d2..d35c17e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -22,11 +22,12 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CheckedThread;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -41,6 +42,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -475,7 +477,7 @@ public class BlobServerPutTest extends TestLogger {
 									.put(jobId, new BlockingInputStream(countDownLatch, data));
 							}
 						} catch (IOException e) {
-							throw new FlinkFutureException("Could not upload blob.", e);
+							throw new CompletionException(new FlinkException("Could not upload blob.", e));
 						}
 					},
 					executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index c624ef2..b779bc9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -29,6 +29,7 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -65,7 +66,7 @@ public class FutureUtilsTest extends TestLogger {
 						if (atomicInteger.incrementAndGet() == retries) {
 							return true;
 						} else {
-							throw new FlinkFutureException("Test exception");
+							throw new CompletionException(new FlinkException("Test exception"));
 						}
 					},
 					TestingUtils.defaultExecutor()),
@@ -119,7 +120,7 @@ public class FutureUtilsTest extends TestLogger {
 							}
 						}
 
-						throw new FlinkFutureException("Test exception");
+						throw new CompletionException(new FlinkException("Test exception"));
 					},
 					TestingUtils.defaultExecutor()),
 			retries,

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 80b445f..4728b18 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -40,11 +39,13 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -977,7 +978,7 @@ public class SlotManagerTest extends TestLogger {
 					try {
 						return slotManager.registerSlotRequest(slotRequest);
 					} catch (SlotManagerException e) {
-						throw new FlinkFutureException(e);
+						throw new CompletionException(e);
 					}
 				},
 				mainThreadExecutor)

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
index 6162a2d..47a37d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
@@ -20,11 +20,11 @@ package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
 import org.apache.flink.runtime.rpc.exceptions.RpcException;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
@@ -33,6 +33,7 @@ import org.junit.Test;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -326,7 +327,7 @@ public class FencedRpcEndpointTest extends TestLogger {
 					try {
 						computationLatch.await();
 					} catch (InterruptedException e) {
-						throw new FlinkFutureException("Waiting on latch failed.", e);
+						throw new CompletionException(new FlinkException("Waiting on latch failed.", e));
 					}
 
 					return value;

http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
index c7b811a..1bc3c20 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.api.operators.async.queue;
 
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.streaming.api.operators.async.OperatorActions;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -34,6 +33,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -98,7 +98,7 @@ public class OrderedStreamElementQueueTest extends TestLogger {
 					try {
 						result.add(queue.poll());
 					} catch (InterruptedException e) {
-						throw new FlinkFutureException(e);
+						throw new CompletionException(e);
 					}
 				}