You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/01 11:56:43 UTC
flink git commit: [FLINK-7322] [futures] Replace Flink's futures with
Java 8's CompletableFuture in CheckpointCoordinator
Repository: flink
Updated Branches:
refs/heads/master 4a9f19b9f -> 4378ac3ae
[FLINK-7322] [futures] Replace Flink's futures with Java 8's CompletableFuture in CheckpointCoordinator
Fix failing JobManagerITCase
This closes #4436.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4378ac3a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4378ac3a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4378ac3a
Branch: refs/heads/master
Commit: 4378ac3ae36f12c8678d2090f7c344832d6d0761
Parents: 4a9f19b
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 31 19:05:22 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Aug 1 13:54:50 2017 +0200
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 41 ++++++++++----------
.../runtime/checkpoint/PendingCheckpoint.java | 9 ++---
.../flink/runtime/jobmanager/JobManager.scala | 11 +++---
.../checkpoint/CheckpointCoordinatorTest.java | 20 +++++-----
.../checkpoint/PendingCheckpointTest.java | 4 +-
.../runtime/jobmanager/JobManagerITCase.scala | 7 ++--
.../testingUtils/TestingJobManagerLike.scala | 3 +-
7 files changed, 48 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 3e36158..5cab7f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -26,9 +26,6 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -58,6 +55,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -362,7 +360,7 @@ public class CheckpointCoordinator {
* configured
* @throws Exception Failures during triggering are forwarded
*/
- public Future<CompletedCheckpoint> triggerSavepoint(long timestamp, String targetDirectory) throws Exception {
+ public CompletableFuture<CompletedCheckpoint> triggerSavepoint(long timestamp, String targetDirectory) throws Exception {
checkNotNull(targetDirectory, "Savepoint target directory");
CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
@@ -377,29 +375,30 @@ public class CheckpointCoordinator {
savepointDirectory,
false);
- Future<CompletedCheckpoint> result;
+ CompletableFuture<CompletedCheckpoint> result;
if (triggerResult.isSuccess()) {
result = triggerResult.getPendingCheckpoint().getCompletionFuture();
} else {
Throwable cause = new Exception("Failed to trigger savepoint: " + triggerResult.getFailureReason().message());
- result = FlinkCompletableFuture.completedExceptionally(cause);
+ result = new CompletableFuture<>();
+ result.completeExceptionally(cause);
+ return result;
}
// Make sure to remove the created base directory on Exceptions
- result.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
- @Override
- public Void apply(Throwable value) {
- try {
- SavepointStore.deleteSavepointDirectory(savepointDirectory);
- } catch (Throwable t) {
- LOG.warn("Failed to delete savepoint directory " + savepointDirectory
- + " after failed savepoint.", t);
+ result.whenCompleteAsync(
+ (CompletedCheckpoint checkpoint, Throwable throwable) -> {
+ if (throwable != null) {
+ try {
+ SavepointStore.deleteSavepointDirectory(savepointDirectory);
+ } catch (Throwable t) {
+ LOG.warn("Failed to delete savepoint directory " + savepointDirectory
+ + " after failed savepoint.", t);
+ }
}
-
- return null;
- }
- }, executor);
+ },
+ executor);
return result;
}
@@ -427,7 +426,7 @@ public class CheckpointCoordinator {
*/
@VisibleForTesting
@Internal
- public Future<CompletedCheckpoint> triggerCheckpoint(long timestamp, CheckpointOptions options) throws Exception {
+ public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(long timestamp, CheckpointOptions options) throws Exception {
switch (options.getCheckpointType()) {
case SAVEPOINT:
return triggerSavepoint(timestamp, options.getTargetLocation());
@@ -440,7 +439,9 @@ public class CheckpointCoordinator {
return triggerResult.getPendingCheckpoint().getCompletionFuture();
} else {
Throwable cause = new Exception("Failed to trigger checkpoint: " + triggerResult.getFailureReason().message());
- return FlinkCompletableFuture.completedExceptionally(cause);
+ CompletableFuture<CompletedCheckpoint> failedResult = new CompletableFuture<>();
+ failedResult.completeExceptionally(cause);
+ return failedResult;
}
default:
http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 0633fec..3472fc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -47,6 +45,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
@@ -104,7 +103,7 @@ public class PendingCheckpoint {
private final String targetDirectory;
/** The promise to fulfill once the checkpoint has been completed. */
- private final FlinkCompletableFuture<CompletedCheckpoint> onCompletionPromise;
+ private final CompletableFuture<CompletedCheckpoint> onCompletionPromise;
/** The executor for potentially blocking I/O operations, like state disposal */
private final Executor executor;
@@ -149,7 +148,7 @@ public class PendingCheckpoint {
this.operatorStates = new HashMap<>();
this.masterState = new ArrayList<>();
this.acknowledgedTasks = new HashSet<>(verticesToConfirm.size());
- this.onCompletionPromise = new FlinkCompletableFuture<>();
+ this.onCompletionPromise = new CompletableFuture<>();
}
// --------------------------------------------------------------------------------------------
@@ -249,7 +248,7 @@ public class PendingCheckpoint {
*
* @return A future to the completed checkpoint
*/
- public Future<CompletedCheckpoint> getCompletionFuture() {
+ public CompletableFuture<CompletedCheckpoint> getCompletionFuture() {
return onCompletionPromise;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 3128cfc..a6712ad 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -21,7 +21,8 @@ package org.apache.flink.runtime.jobmanager
import java.io.IOException
import java.net._
import java.util.UUID
-import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _}
+import java.util.concurrent.{Future => JavaFuture, _}
+import java.util.function.BiFunction
import akka.actor.Status.{Failure, Success}
import akka.actor._
@@ -38,13 +39,13 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.blob.{BlobServer, BlobStore}
import org.apache.flink.runtime.checkpoint._
-import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, SavepointStore}
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
import org.apache.flink.runtime.client._
import org.apache.flink.runtime.clusterframework.FlinkResourceManager
import org.apache.flink.runtime.clusterframework.messages._
import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.concurrent.{AcceptFunction, ApplyFunction, BiFunction, Executors => FlinkExecutors}
+import org.apache.flink.runtime.concurrent.{AcceptFunction, ApplyFunction, Executors => FlinkExecutors}
import org.apache.flink.runtime.execution.SuppressRestartsException
import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager}
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -58,7 +59,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkSchedule
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway
import org.apache.flink.runtime.jobmaster.JobMaster
import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService}
-import org.apache.flink.runtime.jobmaster.JobMaster.{ARCHIVE_NAME, JOB_MANAGER_NAME}
import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -80,12 +80,11 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
import org.apache.flink.runtime.security.SecurityUtils
import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
import org.apache.flink.runtime.taskexecutor.TaskExecutor
-import org.apache.flink.runtime.taskexecutor.TaskExecutor.TASK_MANAGER_NAME
import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util._
import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
-import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils}
+import org.apache.flink.util.{InstantiationUtil, NetUtils}
import scala.collection.JavaConverters._
import scala.collection.mutable
http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 186a819..e78152a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -76,6 +75,7 @@ import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
@@ -1446,7 +1446,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
// trigger the first checkpoint. this should succeed
String savepointDir = tmpFolder.newFolder().getAbsolutePath();
- Future<CompletedCheckpoint> savepointFuture = coord.triggerSavepoint(timestamp, savepointDir);
+ CompletableFuture<CompletedCheckpoint> savepointFuture = coord.triggerSavepoint(timestamp, savepointDir);
assertFalse(savepointFuture.isDone());
// validate that we have a pending savepoint
@@ -1601,7 +1601,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
String savepointDir = tmpFolder.newFolder().getAbsolutePath();
// Trigger savepoint and checkpoint
- Future<CompletedCheckpoint> savepointFuture1 = coord.triggerSavepoint(timestamp, savepointDir);
+ CompletableFuture<CompletedCheckpoint> savepointFuture1 = coord.triggerSavepoint(timestamp, savepointDir);
long savepointId1 = counter.getLast();
assertEquals(1, coord.getNumberOfPendingCheckpoints());
@@ -1626,7 +1626,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
long checkpointId3 = counter.getLast();
assertEquals(2, coord.getNumberOfPendingCheckpoints());
- Future<CompletedCheckpoint> savepointFuture2 = coord.triggerSavepoint(timestamp + 4, savepointDir);
+ CompletableFuture<CompletedCheckpoint> savepointFuture2 = coord.triggerSavepoint(timestamp + 4, savepointDir);
long savepointId2 = counter.getLast();
assertEquals(3, coord.getNumberOfPendingCheckpoints());
@@ -1911,7 +1911,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
null,
Executors.directExecutor());
- List<Future<CompletedCheckpoint>> savepointFutures = new ArrayList<>();
+ List<CompletableFuture<CompletedCheckpoint>> savepointFutures = new ArrayList<>();
int numSavepoints = 5;
@@ -1923,7 +1923,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
// After triggering multiple savepoints, all should in progress
- for (Future<CompletedCheckpoint> savepointFuture : savepointFutures) {
+ for (CompletableFuture<CompletedCheckpoint> savepointFuture : savepointFutures) {
assertFalse(savepointFuture.isDone());
}
@@ -1934,7 +1934,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
// After ACKs, all should be completed
- for (Future<CompletedCheckpoint> savepointFuture : savepointFutures) {
+ for (CompletableFuture<CompletedCheckpoint> savepointFuture : savepointFutures) {
assertTrue(savepointFuture.isDone());
}
}
@@ -1966,10 +1966,10 @@ public class CheckpointCoordinatorTest extends TestLogger {
String savepointDir = tmpFolder.newFolder().getAbsolutePath();
- Future<CompletedCheckpoint> savepoint0 = coord.triggerSavepoint(0, savepointDir);
+ CompletableFuture<CompletedCheckpoint> savepoint0 = coord.triggerSavepoint(0, savepointDir);
assertFalse("Did not trigger savepoint", savepoint0.isDone());
- Future<CompletedCheckpoint> savepoint1 = coord.triggerSavepoint(1, savepointDir);
+ CompletableFuture<CompletedCheckpoint> savepoint1 = coord.triggerSavepoint(1, savepointDir);
assertFalse("Did not trigger savepoint", savepoint1.isDone());
}
@@ -3600,7 +3600,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
assertTrue(1 == completedCheckpointStore.getNumberOfRetainedCheckpoints());
// trigger a savepoint --> this should not have any effect on the CompletedCheckpointStore
- Future<CompletedCheckpoint> savepointFuture = checkpointCoordinator.triggerSavepoint(savepointTimestamp, savepointDir);
+ CompletableFuture<CompletedCheckpoint> savepointFuture = checkpointCoordinator.triggerSavepoint(savepointTimestamp, savepointDir);
checkpointCoordinator.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint(
http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index a96b597..7d103d0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -40,6 +39,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
@@ -134,7 +134,7 @@ public class PendingCheckpointTest {
// Abort declined
PendingCheckpoint pending = createPendingCheckpoint(props, "ignored");
- Future<CompletedCheckpoint> future = pending.getCompletionFuture();
+ CompletableFuture<CompletedCheckpoint> future = pending.getCompletionFuture();
assertFalse(future.isDone());
pending.abortDeclined();
http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 5fb9ddf..e209608 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.jobmanager
+import java.util.concurrent.CompletableFuture
+
import akka.actor.ActorSystem
import akka.testkit.{ImplicitSender, TestKit}
import akka.util.Timeout
@@ -25,7 +27,6 @@ import org.apache.flink.api.common.JobID
import org.apache.flink.runtime.akka.ListeningBehaviour
import org.apache.flink.runtime.checkpoint.{CheckpointCoordinator, CompletedCheckpoint}
import org.apache.flink.runtime.client.JobExecutionException
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture
import org.apache.flink.runtime.io.network.partition.ResultPartitionType
import org.apache.flink.runtime.jobgraph.tasks.{ExternalizedCheckpointSettings, JobCheckpointingSettings}
import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex, ScheduleMode}
@@ -913,7 +914,7 @@ class JobManagerITCase(_system: ActorSystem)
doThrow(new Exception("Expected Test Exception"))
.when(checkpointCoordinator)
.triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())
- val savepointPathPromise = new FlinkCompletableFuture[CompletedCheckpoint]()
+ val savepointPathPromise = new CompletableFuture[CompletedCheckpoint]()
doReturn(savepointPathPromise)
.when(checkpointCoordinator)
.triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())
@@ -982,7 +983,7 @@ class JobManagerITCase(_system: ActorSystem)
.when(checkpointCoordinator)
.triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())
- val savepointPromise = new FlinkCompletableFuture[CompletedCheckpoint]()
+ val savepointPromise = new CompletableFuture[CompletedCheckpoint]()
doReturn(savepointPromise)
.when(checkpointCoordinator)
.triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())
http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 3d3af95..cd88133 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.testingUtils
+import java.util.function.BiFunction
+
import akka.actor.{ActorRef, Cancellable, Terminated}
import akka.pattern.{ask, pipe}
import org.apache.flink.api.common.JobID
@@ -25,7 +27,6 @@ import org.apache.flink.runtime.FlinkActor
import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
-import org.apache.flink.runtime.concurrent.BiFunction
import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.jobgraph.JobStatus
import org.apache.flink.runtime.jobmanager.JobManager