You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/26 09:25:51 UTC
[1/2] flink git commit: [FLINK-7664] Replace FlinkFutureException by
java.util.concurrent.CompletionException
Repository: flink
Updated Branches:
refs/heads/master e9decac62 -> d3458ec7b
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
index 7315f65..1c286fa 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.operators.async.queue;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -37,6 +36,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -180,7 +180,7 @@ public class StreamElementQueueTest extends TestLogger {
try {
queue.put(streamRecordQueueEntry2);
} catch (InterruptedException e) {
- throw new FlinkFutureException(e);
+ throw new CompletionException(e);
}
},
executor);
@@ -220,7 +220,7 @@ public class StreamElementQueueTest extends TestLogger {
try {
return queue.peekBlockingly();
} catch (InterruptedException e) {
- throw new FlinkFutureException(e);
+ throw new CompletionException(e);
}
},
executor);
@@ -244,7 +244,7 @@ public class StreamElementQueueTest extends TestLogger {
try {
return queue.poll();
} catch (InterruptedException e) {
- throw new FlinkFutureException(e);
+ throw new CompletionException(e);
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
index acc6b8e..73bdcc5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.operators.async.queue;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -35,6 +34,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -104,7 +104,7 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
try {
return queue.poll();
} catch (InterruptedException e) {
- throw new FlinkFutureException(e);
+ throw new CompletionException(e);
}
},
executor);
@@ -125,7 +125,7 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
try {
return queue.poll();
} catch (InterruptedException e) {
- throw new FlinkFutureException(e);
+ throw new CompletionException(e);
}
},
executor);
@@ -171,7 +171,7 @@ public class UnorderedStreamElementQueueTest extends TestLogger {
try {
return queue.poll();
} catch (InterruptedException e) {
- throw new FlinkFutureException(e);
+ throw new CompletionException(e);
}
},
executor);
[2/2] flink git commit: [FLINK-7664] Replace FlinkFutureException by
java.util.concurrent.CompletionException
Posted by tr...@apache.org.
[FLINK-7664] Replace FlinkFutureException by java.util.concurrent.CompletionException
FlinkFutureException was introduced to fail a CompletableFuture callback. However, there
was already such a class which allows to better handle failures in different stages which
is the java.util.CompletionException. Therefore we replace FlinkFutureException by
CompletionException and remove the former.
This closes #4701.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3458ec7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3458ec7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3458ec7
Branch: refs/heads/master
Commit: d3458ec7ba0a8e61d07dc5330bda8a1f81de81fe
Parents: e9decac
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu Sep 21 19:21:07 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Sep 26 11:25:42 2017 +0200
----------------------------------------------------------------------
.../webmonitor/handlers/JarDeleteHandler.java | 5 ++-
.../webmonitor/handlers/JarListHandler.java | 5 ++-
.../webmonitor/handlers/JarPlanHandler.java | 4 +-
.../webmonitor/handlers/JarRunHandler.java | 5 ++-
.../runtime/akka/AkkaJobManagerGateway.java | 19 ++++----
.../concurrent/FlinkFutureException.java | 47 --------------------
.../runtime/jobmaster/JobManagerRunner.java | 5 ++-
.../resourcemanager/ResourceManagerRunner.java | 6 ++-
.../AbstractExecutionGraphRequestHandler.java | 4 +-
.../handler/legacy/ClusterOverviewHandler.java | 4 +-
.../handler/legacy/CurrentJobIdsHandler.java | 5 ++-
.../legacy/CurrentJobsOverviewHandler.java | 5 ++-
.../handler/legacy/JobAccumulatorsHandler.java | 5 ++-
.../handler/legacy/JobCancellationHandler.java | 5 ++-
.../JobCancellationWithSavepointHandlers.java | 11 ++---
.../rest/handler/legacy/JobConfigHandler.java | 5 ++-
.../rest/handler/legacy/JobDetailsHandler.java | 5 ++-
.../handler/legacy/JobExceptionsHandler.java | 5 ++-
.../handler/legacy/JobManagerConfigHandler.java | 5 ++-
.../rest/handler/legacy/JobStoppingHandler.java | 5 ++-
.../legacy/JobVertexAccumulatorsHandler.java | 5 ++-
.../handler/legacy/JobVertexDetailsHandler.java | 5 ++-
.../legacy/JobVertexTaskManagersHandler.java | 5 ++-
...taskExecutionAttemptAccumulatorsHandler.java | 5 ++-
.../SubtaskExecutionAttemptDetailsHandler.java | 5 ++-
.../legacy/SubtasksAllAccumulatorsHandler.java | 5 ++-
.../handler/legacy/SubtasksTimesHandler.java | 5 ++-
.../handler/legacy/TaskManagerLogHandler.java | 11 ++---
.../handler/legacy/TaskManagersHandler.java | 7 +--
.../checkpoints/CheckpointConfigHandler.java | 5 ++-
.../CheckpointStatsDetailsHandler.java | 5 ++-
.../checkpoints/CheckpointStatsHandler.java | 5 ++-
.../legacy/metrics/AbstractMetricsHandler.java | 5 ++-
.../runtime/blob/BlobServerDeleteTest.java | 6 ++-
.../flink/runtime/blob/BlobServerGetTest.java | 17 ++++---
.../flink/runtime/blob/BlobServerPutTest.java | 6 ++-
.../runtime/concurrent/FutureUtilsTest.java | 5 ++-
.../slotmanager/SlotManagerTest.java | 5 ++-
.../runtime/rpc/FencedRpcEndpointTest.java | 5 ++-
.../queue/OrderedStreamElementQueueTest.java | 4 +-
.../async/queue/StreamElementQueueTest.java | 8 ++--
.../queue/UnorderedStreamElementQueueTest.java | 8 ++--
42 files changed, 146 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index 04f663d..cf49b07 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -18,10 +18,10 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -30,6 +30,7 @@ import java.io.FilenameFilter;
import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -80,7 +81,7 @@ public class JarDeleteHandler extends AbstractJsonRequestHandler {
return writer.toString();
}
catch (Exception e) {
- throw new FlinkFutureException("Failed to delete jar id " + pathParams.get("jarid") + '.', e);
+ throw new CompletionException(new FlinkException("Failed to delete jar id " + pathParams.get("jarid") + '.', e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index 4248dd4..746870b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -19,11 +19,11 @@
package org.apache.flink.runtime.webmonitor.handlers;
import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -33,6 +33,7 @@ import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.jar.JarFile;
import java.util.jar.Manifest;
@@ -140,7 +141,7 @@ public class JarListHandler extends AbstractJsonRequestHandler {
return writer.toString();
}
catch (Exception e) {
- throw new FlinkFutureException("Failed to fetch jar list.", e);
+ throw new CompletionException(new FlinkException("Failed to fetch jar list.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index 4d79492..aa7c056 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.webmonitor.handlers;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -30,6 +29,7 @@ import java.io.File;
import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -65,7 +65,7 @@ public class JarPlanHandler extends JarActionHandler {
return writer.toString();
}
catch (Exception e) {
- throw new FlinkFutureException(e);
+ throw new CompletionException(e);
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 16a1565..debdfa1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -24,10 +24,10 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -36,6 +36,7 @@ import java.io.File;
import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -86,7 +87,7 @@ public class JarRunHandler extends JarActionHandler {
gen.close();
return writer.toString();
} catch (Exception e) {
- throw new FlinkFutureException("Could not run the jar.", e);
+ throw new CompletionException(new FlinkException("Could not run the jar.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 23ba6b3..18025dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.akka;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
@@ -36,12 +35,14 @@ import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.RequestStatusOverview;
import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import scala.Option;
import scala.reflect.ClassTag$;
@@ -101,15 +102,15 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
if (Objects.equals(success.jobId(), jobGraph.getJobID())) {
return Acknowledge.get();
} else {
- throw new FlinkFutureException("JobManager responded for wrong Job. This Job: " +
- jobGraph.getJobID() + ", response: " + success.jobId());
+ throw new CompletionException(new FlinkException("JobManager responded for wrong Job. This Job: " +
+ jobGraph.getJobID() + ", response: " + success.jobId()));
}
} else if (response instanceof JobManagerMessages.JobResultFailure) {
JobManagerMessages.JobResultFailure failure = ((JobManagerMessages.JobResultFailure) response);
- throw new FlinkFutureException("Job submission failed.", failure.cause());
+ throw new CompletionException(new FlinkException("Job submission failed.", failure.cause()));
} else {
- throw new FlinkFutureException("Unknown response to SubmitJob message: " + response + '.');
+ throw new CompletionException(new FlinkException("Unknown response to SubmitJob message: " + response + '.'));
}
}
);
@@ -127,7 +128,7 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
if (response instanceof JobManagerMessages.CancellationSuccess) {
return ((JobManagerMessages.CancellationSuccess) response).savepointPath();
} else {
- throw new FlinkFutureException("Cancel with savepoint failed.", ((JobManagerMessages.CancellationFailure) response).cause());
+ throw new CompletionException(new FlinkException("Cancel with savepoint failed.", ((JobManagerMessages.CancellationFailure) response).cause()));
}
});
}
@@ -144,7 +145,7 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
if (response instanceof JobManagerMessages.CancellationSuccess) {
return Acknowledge.get();
} else {
- throw new FlinkFutureException("Cancel job failed " + jobId + '.', ((JobManagerMessages.CancellationFailure) response).cause());
+ throw new CompletionException(new FlinkException("Cancel job failed " + jobId + '.', ((JobManagerMessages.CancellationFailure) response).cause()));
}
});
}
@@ -161,7 +162,7 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
if (response instanceof JobManagerMessages.StoppingSuccess) {
return Acknowledge.get();
} else {
- throw new FlinkFutureException("Stop job failed " + jobId + '.', ((JobManagerMessages.StoppingFailure) response).cause());
+ throw new CompletionException(new FlinkException("Stop job failed " + jobId + '.', ((JobManagerMessages.StoppingFailure) response).cause()));
}
});
}
@@ -211,7 +212,7 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
} else if (response instanceof JobManagerMessages.JobNotFound) {
return Optional.empty();
} else {
- throw new FlinkFutureException("Unknown response: " + response + '.');
+ throw new CompletionException(new FlinkException("Unknown response: " + response + '.'));
}
});
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java
deleted file mode 100644
index c728904..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.concurrent;
-
-import org.apache.flink.util.FlinkRuntimeException;
-
-import java.util.concurrent.CompletionStage;
-
-/**
- * Base class for exceptions which are thrown in {@link CompletionStage}.
- *
- * <p>The exception has to extend {@link FlinkRuntimeException} because only
- * unchecked exceptions can be thrown in a future's stage. Additionally we let
- * it extend the Flink runtime exception because it designates the exception to
- * come from a Flink stage.
- */
-public class FlinkFutureException extends FlinkRuntimeException {
- private static final long serialVersionUID = -8878194471694178210L;
-
- public FlinkFutureException(String message) {
- super(message);
- }
-
- public FlinkFutureException(Throwable cause) {
- super(cause);
- }
-
- public FlinkFutureException(String message, Throwable cause) {
- super(message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 0bf0cc2..70abf2f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -40,6 +39,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -223,7 +224,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
}
if (exception != null) {
- throw new FlinkFutureException("Could not properly shut down the JobManagerRunner.", exception);
+ throw new CompletionException(new FlinkException("Could not properly shut down the JobManagerRunner.", exception));
}
});
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index ed6e18c..caa3ba0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -20,17 +20,19 @@ package org.apache.flink.runtime.resourcemanager;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
/**
* Simple {@link StandaloneResourceManager} runner. It instantiates the resource manager's services
@@ -107,7 +109,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
try {
resourceManagerRuntimeServices.shutDown();
} catch (Exception e) {
- throw new FlinkFutureException("Could not properly shut down the resource manager runtime services.", e);
+ throw new CompletionException(new FlinkException("Could not properly shut down the resource manager runtime services.", e));
}
});
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
index e214a36..88932b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -30,6 +29,7 @@ import org.apache.flink.util.Preconditions;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -70,7 +70,7 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR
if (optGraph.isPresent()) {
return handleRequest(optGraph.get(), pathParams);
} else {
- throw new FlinkFutureException(new NotFoundException("Could not find job with jobId " + jid + '.'));
+ throw new CompletionException(new NotFoundException("Could not find job with jobId " + jid + '.'));
}
}, executor);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
index 794ff20..2f883ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -39,6 +38,7 @@ import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import static org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders.CLUSTER_OVERVIEW_REST_PATH;
@@ -96,7 +96,7 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler implement
gen.close();
return writer.toString();
} catch (IOException exception) {
- throw new FlinkFutureException("Could not write cluster overview.", exception);
+ throw new CompletionException(new FlinkException("Could not write cluster overview.", exception));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
index 07d9707..b806f86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
@@ -20,15 +20,16 @@ package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@@ -104,7 +105,7 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
}
}
catch (Exception e) {
- throw new FlinkFutureException("Failed to fetch list of all running jobs.", e);
+ throw new CompletionException(new FlinkException("Failed to fetch list of all running jobs.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
index b1939e5..669ef32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -33,6 +32,7 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -42,6 +42,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -129,7 +130,7 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler imple
gen.close();
return writer.toString();
} catch (IOException e) {
- throw new FlinkFutureException("Could not write current jobs overview json.", e);
+ throw new CompletionException(new FlinkException("Could not write current jobs overview json.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java
index 0a3b050..68810eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java
@@ -19,10 +19,10 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -32,6 +32,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -57,7 +58,7 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler
try {
return createJobAccumulatorsJson(graph);
} catch (IOException e) {
- throw new FlinkFutureException("Could not create job accumulators json.", e);
+ throw new CompletionException(new FlinkException("Could not create job accumulators json.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
index a194f30..9e9849f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
@@ -20,13 +20,14 @@ package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -64,7 +65,7 @@ public class JobCancellationHandler extends AbstractJsonRequestHandler {
}
}
catch (Exception e) {
- throw new FlinkFutureException("Failed to cancel the job with id: " + pathParams.get("jobid"), e);
+ throw new CompletionException(new FlinkException("Failed to cancel the job with id: " + pathParams.get("jobid"), e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
index 23e94f5..fe71a52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
@@ -24,12 +24,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
import org.apache.flink.runtime.rest.NotFoundException;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -50,6 +50,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -154,12 +155,12 @@ public class JobCancellationWithSavepointHandlers {
return graphFuture.thenApplyAsync(
(Optional<AccessExecutionGraph> optGraph) -> {
final AccessExecutionGraph graph = optGraph.orElseThrow(
- () -> new FlinkFutureException(
+ () -> new CompletionException(
new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')));
CheckpointCoordinator coord = graph.getCheckpointCoordinator();
if (coord == null) {
- throw new FlinkFutureException(new Exception("Cannot find CheckpointCoordinator for job."));
+ throw new CompletionException(new FlinkException("Cannot find CheckpointCoordinator for job."));
}
String targetDirectory = pathParams.get("targetDirectory");
@@ -177,7 +178,7 @@ public class JobCancellationWithSavepointHandlers {
try {
return handleNewRequest(jobManagerGateway, jobId, targetDirectory, coord.getCheckpointTimeout());
} catch (IOException e) {
- throw new FlinkFutureException("Could not cancel job with savepoint.", e);
+ throw new CompletionException(new FlinkException("Could not cancel job with savepoint.", e));
}
}, executor);
} else {
@@ -342,7 +343,7 @@ public class JobCancellationWithSavepointHandlers {
}
}
} catch (Exception e) {
- throw new FlinkFutureException("Could not handle in progress request.", e);
+ throw new CompletionException(new FlinkException("Could not handle in progress request.", e));
}
});
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
index bb1cf8f..787217f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
@@ -19,10 +19,10 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.ArchivedExecutionConfig;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -32,6 +32,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -57,7 +58,7 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
try {
return createJobConfigJson(graph);
} catch (IOException e) {
- throw new FlinkFutureException("Could not write job config json.", e);
+ throw new CompletionException(new FlinkException("Could not write job config json.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java
index dd6aee8..b9f812b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.rest.handler.legacy;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -29,6 +28,7 @@ import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -40,6 +40,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -76,7 +77,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
try {
return createJobDetailsJson(graph, fetcher);
} catch (IOException e) {
- throw new FlinkFutureException("Could not create job details json.", e);
+ throw new CompletionException(new FlinkException("Could not create job details json.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
index 3236062..566631e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.rest.handler.legacy;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -27,6 +26,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -36,6 +36,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -63,7 +64,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
try {
return createJobExceptionsJson(graph);
} catch (IOException e) {
- throw new FlinkFutureException("Could not create job exceptions json.", e);
+ throw new CompletionException(new FlinkException("Could not create job exceptions json.", e));
}
},
executor
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java
index 364af91..d638467 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java
@@ -19,8 +19,8 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -28,6 +28,7 @@ import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -79,7 +80,7 @@ public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
gen.close();
return writer.toString();
} catch (IOException e) {
- throw new FlinkFutureException("Could not write configuration.", e);
+ throw new CompletionException(new FlinkException("Could not write configuration.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandler.java
index cc41a1c..9eefcd7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandler.java
@@ -20,13 +20,14 @@ package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -64,7 +65,7 @@ public class JobStoppingHandler extends AbstractJsonRequestHandler {
}
}
catch (Exception e) {
- throw new FlinkFutureException("Failed to stop the job with id: " + pathParams.get("jobid") + '.', e);
+ throw new CompletionException(new FlinkException("Failed to stop the job with id: " + pathParams.get("jobid") + '.', e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java
index 9830ab4..d448027 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java
@@ -19,11 +19,11 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -34,6 +34,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -59,7 +60,7 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
try {
return createVertexAccumulatorsJson(jobVertex);
} catch (IOException e) {
- throw new FlinkFutureException("Could not create job vertex accumulators json.", e);
+ throw new CompletionException(new FlinkException("Could not create job vertex accumulators json.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java
index 3f0c77c..bfa7020 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.rest.handler.legacy;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -28,6 +27,7 @@ import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -40,6 +40,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -69,7 +70,7 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
try {
return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher);
} catch (IOException e) {
- throw new FlinkFutureException("Could not write the vertex details json.", e);
+ throw new CompletionException(new FlinkException("Could not write the vertex details json.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java
index fa4ab67..985ea1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.rest.handler.legacy;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -29,6 +28,7 @@ import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -42,6 +42,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -71,7 +72,7 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
try {
return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher);
} catch (IOException e) {
- throw new FlinkFutureException("Could not create TaskManager json.", e);
+ throw new CompletionException(new FlinkException("Could not create TaskManager json.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java
index be4fe0b..1570896 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -19,13 +19,13 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -36,6 +36,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -62,7 +63,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
try {
return createAttemptAccumulatorsJson(execAttempt);
} catch (IOException e) {
- throw new FlinkFutureException("Could not create accumulator json.", e);
+ throw new CompletionException(new FlinkException("Could not create accumulator json.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
index 83a8793..b0b22ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.rest.handler.legacy;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -29,6 +28,7 @@ import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -41,6 +41,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import static org.apache.flink.runtime.rest.handler.legacy.SubtaskCurrentAttemptDetailsHandler.SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH;
@@ -71,7 +72,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
try {
return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher);
} catch (IOException e) {
- throw new FlinkFutureException("Could not create attempt details json.", e);
+ throw new CompletionException(new FlinkException("Could not create attempt details json.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java
index 6d0757d..10d9e02 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java
@@ -19,13 +19,13 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -36,6 +36,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -61,7 +62,7 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
try {
return createSubtasksAccumulatorsJson(jobVertex);
} catch (IOException e) {
- throw new FlinkFutureException("Could not create subtasks accumulator json.", e);
+ throw new CompletionException(new FlinkException("Could not create subtasks accumulator json.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java
index 13fdc16..bf1d87e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.rest.handler.legacy;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -26,6 +25,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -36,6 +36,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -62,7 +63,7 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
try {
return createSubtaskTimesJson(jobVertex);
} catch (IOException e) {
- throw new FlinkFutureException("Could not write subtask time json.", e);
+ throw new CompletionException(new FlinkException("Could not write subtask time json.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
index 718657e..e84491c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
@@ -32,13 +32,13 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobView;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.RedirectHandler;
import org.apache.flink.runtime.rest.handler.WebHandler;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
@@ -74,6 +74,7 @@ import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -160,7 +161,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
try {
return new BlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), port), config, blobView);
} catch (IOException e) {
- throw new FlinkFutureException("Could not create BlobCache.", e);
+ throw new CompletionException(new FlinkException("Could not create BlobCache.", e));
}
},
executor);
@@ -178,7 +179,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
CompletableFuture<BlobKey> blobKeyFuture = taskManagerFuture.thenCompose(
(Optional<Instance> optTMInstance) -> {
Instance taskManagerInstance = optTMInstance.orElseThrow(
- () -> new FlinkFutureException("Could not find instance with " + instanceID + '.'));
+ () -> new CompletionException(new FlinkException("Could not find instance with " + instanceID + '.')));
switch (fileMode) {
case LOG:
return taskManagerInstance.getTaskManagerGateway().requestTaskManagerLog(timeout);
@@ -200,7 +201,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
try {
blobCache.deleteGlobal(lastSubmittedFile.get(taskManagerID));
} catch (IOException e) {
- throw new FlinkFutureException("Could not delete file for " + taskManagerID + '.', e);
+ throw new CompletionException(new FlinkException("Could not delete file for " + taskManagerID + '.', e));
}
lastSubmittedFile.put(taskManagerID, blobKey);
}
@@ -210,7 +211,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im
try {
return blobCache.getFile(blobKey).getAbsolutePath();
} catch (IOException e) {
- throw new FlinkFutureException("Could not retrieve blob for " + blobKey + '.', e);
+ throw new CompletionException(new FlinkException("Could not retrieve blob for " + blobKey + '.', e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
index 95d417a..0880d0c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
@@ -19,13 +19,13 @@
package org.apache.flink.runtime.rest.handler.legacy;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.StringUtils;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -37,6 +37,7 @@ import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import static java.util.Objects.requireNonNull;
@@ -83,7 +84,7 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler {
optTaskManager.map(Collections::singleton).orElse(Collections.emptySet()),
pathParams);
} catch (IOException e) {
- throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
+ throw new CompletionException(new FlinkException("Could not write TaskManagers JSON.", e));
}
},
executor);
@@ -95,7 +96,7 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler {
try {
return writeTaskManagersJson(taskManagers, pathParams);
} catch (IOException e) {
- throw new FlinkFutureException("Could not write TaskManagers JSON.", e);
+ throw new CompletionException(new FlinkException("Could not write TaskManagers JSON.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
index 2086628..27a64ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
@@ -27,6 +26,7 @@ import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -36,6 +36,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -61,7 +62,7 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
try {
return createCheckpointConfigJson(graph);
} catch (IOException e) {
- throw new FlinkFutureException("Could not create checkpoint config json.", e);
+ throw new CompletionException(new FlinkException("Could not create checkpoint config json.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
index 61ebeda..b61c5d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
@@ -24,13 +24,13 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -42,6 +42,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -92,7 +93,7 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
try {
return createCheckpointDetailsJson(checkpoint);
} catch (IOException e) {
- throw new FlinkFutureException("Could not create checkpoint details json.", e);
+ throw new CompletionException(new FlinkException("Could not create checkpoint details json.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
index abb353e..bea94f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
@@ -27,13 +27,13 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.FlinkException;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -45,6 +45,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -70,7 +71,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
try {
return createCheckpointStatsJson(graph);
} catch (IOException e) {
- throw new FlinkFutureException("Could not create checkpoint stats json.", e);
+ throw new CompletionException(new FlinkException("Could not create checkpoint stats json.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
index 315bdc2..6cf83c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java
@@ -18,10 +18,10 @@
package org.apache.flink.runtime.rest.handler.legacy.metrics;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
/**
@@ -62,7 +63,7 @@ public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler
? getMetricsValues(pathParams, requestedMetricsList)
: getAvailableMetricsList(pathParams);
} catch (IOException e) {
- throw new FlinkFutureException("Could not retrieve metrics.", e);
+ throw new CompletionException(new FlinkException("Could not retrieve metrics.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
index 6bb5ab5..aad8816 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java
@@ -21,10 +21,11 @@ package org.apache.flink.runtime.blob;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.TestLogger;
+
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -37,6 +38,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -397,7 +399,7 @@ public class BlobServerDeleteTest extends TestLogger {
try (BlobClient blobClient = blobServer.createClient()) {
deleteHelper(blobClient, jobId, blobKey);
} catch (IOException e) {
- throw new FlinkFutureException("Could not delete the given blob key " + blobKey + '.', e);
+ throw new CompletionException(new FlinkException("Could not delete the given blob key " + blobKey + '.', e));
}
return null;
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
index 7ccf075..d3d3484 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerGetTest.java
@@ -18,14 +18,15 @@
package org.apache.flink.runtime.blob;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -36,7 +37,6 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
-
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.security.MessageDigest;
@@ -45,12 +45,17 @@ import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.apache.flink.runtime.blob.BlobClientTest.validateGetAndClose;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
@@ -303,7 +308,7 @@ public class BlobServerGetTest extends TestLogger {
return new ByteArrayInputStream(buffer);
} catch (IOException e) {
- throw new FlinkFutureException("Could not read blob for key " + blobKey + '.', e);
+ throw new CompletionException(new FlinkException("Could not read blob for key " + blobKey + '.', e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
index 2b8e2d2..d35c17e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerPutTest.java
@@ -22,11 +22,12 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
+
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -41,6 +42,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -475,7 +477,7 @@ public class BlobServerPutTest extends TestLogger {
.put(jobId, new BlockingInputStream(countDownLatch, data));
}
} catch (IOException e) {
- throw new FlinkFutureException("Could not upload blob.", e);
+ throw new CompletionException(new FlinkException("Could not upload blob.", e));
}
},
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index c624ef2..b779bc9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -29,6 +29,7 @@ import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -65,7 +66,7 @@ public class FutureUtilsTest extends TestLogger {
if (atomicInteger.incrementAndGet() == retries) {
return true;
} else {
- throw new FlinkFutureException("Test exception");
+ throw new CompletionException(new FlinkException("Test exception"));
}
},
TestingUtils.defaultExecutor()),
@@ -119,7 +120,7 @@ public class FutureUtilsTest extends TestLogger {
}
}
- throw new FlinkFutureException("Test exception");
+ throw new CompletionException(new FlinkException("Test exception"));
},
TestingUtils.defaultExecutor()),
retries,
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 80b445f..4728b18 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
@@ -40,11 +39,13 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -977,7 +978,7 @@ public class SlotManagerTest extends TestLogger {
try {
return slotManager.registerSlotRequest(slotRequest);
} catch (SlotManagerException e) {
- throw new FlinkFutureException(e);
+ throw new CompletionException(e);
}
},
mainThreadExecutor)
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
index 6162a2d..47a37d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
@@ -20,11 +20,11 @@ package org.apache.flink.runtime.rpc;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
@@ -33,6 +33,7 @@ import org.junit.Test;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -326,7 +327,7 @@ public class FencedRpcEndpointTest extends TestLogger {
try {
computationLatch.await();
} catch (InterruptedException e) {
- throw new FlinkFutureException("Waiting on latch failed.", e);
+ throw new CompletionException(new FlinkException("Waiting on latch failed.", e));
}
return value;
http://git-wip-us.apache.org/repos/asf/flink/blob/d3458ec7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
index c7b811a..1bc3c20 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.operators.async.queue;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
import org.apache.flink.streaming.api.operators.async.OperatorActions;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -34,6 +33,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -98,7 +98,7 @@ public class OrderedStreamElementQueueTest extends TestLogger {
try {
result.add(queue.poll());
} catch (InterruptedException e) {
- throw new FlinkFutureException(e);
+ throw new CompletionException(e);
}
}