You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/08/01 11:56:43 UTC

flink git commit: [FLINK-7322] [futures] Replace Flink's futures with Java 8's CompletableFuture in CheckpointCoordinator

Repository: flink
Updated Branches:
  refs/heads/master 4a9f19b9f -> 4378ac3ae


[FLINK-7322] [futures] Replace Flink's futures with Java 8's CompletableFuture in CheckpointCoordinator

Fix failing JobManagerITCase

This closes #4436.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4378ac3a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4378ac3a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4378ac3a

Branch: refs/heads/master
Commit: 4378ac3ae36f12c8678d2090f7c344832d6d0761
Parents: 4a9f19b
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jul 31 19:05:22 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Aug 1 13:54:50 2017 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 41 ++++++++++----------
 .../runtime/checkpoint/PendingCheckpoint.java   |  9 ++---
 .../flink/runtime/jobmanager/JobManager.scala   | 11 +++---
 .../checkpoint/CheckpointCoordinatorTest.java   | 20 +++++-----
 .../checkpoint/PendingCheckpointTest.java       |  4 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |  7 ++--
 .../testingUtils/TestingJobManagerLike.scala    |  3 +-
 7 files changed, 48 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 3e36158..5cab7f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -26,9 +26,6 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -58,6 +55,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -362,7 +360,7 @@ public class CheckpointCoordinator {
 	 *                               configured
 	 * @throws Exception             Failures during triggering are forwarded
 	 */
-	public Future<CompletedCheckpoint> triggerSavepoint(long timestamp, String targetDirectory) throws Exception {
+	public CompletableFuture<CompletedCheckpoint> triggerSavepoint(long timestamp, String targetDirectory) throws Exception {
 		checkNotNull(targetDirectory, "Savepoint target directory");
 
 		CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
@@ -377,29 +375,30 @@ public class CheckpointCoordinator {
 			savepointDirectory,
 			false);
 
-		Future<CompletedCheckpoint> result;
+		CompletableFuture<CompletedCheckpoint> result;
 
 		if (triggerResult.isSuccess()) {
 			result = triggerResult.getPendingCheckpoint().getCompletionFuture();
 		} else {
 			Throwable cause = new Exception("Failed to trigger savepoint: " + triggerResult.getFailureReason().message());
-			result = FlinkCompletableFuture.completedExceptionally(cause);
+			result = new CompletableFuture<>();
+			result.completeExceptionally(cause);
+			return result;
 		}
 
 		// Make sure to remove the created base directory on Exceptions
-		result.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
-			@Override
-			public Void apply(Throwable value) {
-				try {
-					SavepointStore.deleteSavepointDirectory(savepointDirectory);
-				} catch (Throwable t) {
-					LOG.warn("Failed to delete savepoint directory " + savepointDirectory
-						+ " after failed savepoint.", t);
+		result.whenCompleteAsync(
+			(CompletedCheckpoint checkpoint, Throwable throwable) -> {
+				if (throwable != null) {
+					try {
+						SavepointStore.deleteSavepointDirectory(savepointDirectory);
+					} catch (Throwable t) {
+						LOG.warn("Failed to delete savepoint directory " + savepointDirectory
+							+ " after failed savepoint.", t);
+					}
 				}
-
-				return null;
-			}
-		}, executor);
+			},
+			executor);
 
 		return result;
 	}
@@ -427,7 +426,7 @@ public class CheckpointCoordinator {
 	 */
 	@VisibleForTesting
 	@Internal
-	public Future<CompletedCheckpoint> triggerCheckpoint(long timestamp, CheckpointOptions options) throws Exception {
+	public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(long timestamp, CheckpointOptions options) throws Exception {
 		switch (options.getCheckpointType()) {
 			case SAVEPOINT:
 				return triggerSavepoint(timestamp, options.getTargetLocation());
@@ -440,7 +439,9 @@ public class CheckpointCoordinator {
 					return triggerResult.getPendingCheckpoint().getCompletionFuture();
 				} else {
 					Throwable cause = new Exception("Failed to trigger checkpoint: " + triggerResult.getFailureReason().message());
-					return FlinkCompletableFuture.completedExceptionally(cause);
+					CompletableFuture<CompletedCheckpoint> failedResult = new CompletableFuture<>();
+					failedResult.completeExceptionally(cause);
+					return failedResult;
 				}
 
 			default:

http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 0633fec..3472fc2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -47,6 +45,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 
@@ -104,7 +103,7 @@ public class PendingCheckpoint {
 	private final String targetDirectory;
 
 	/** The promise to fulfill once the checkpoint has been completed. */
-	private final FlinkCompletableFuture<CompletedCheckpoint> onCompletionPromise;
+	private final CompletableFuture<CompletedCheckpoint> onCompletionPromise;
 
 	/** The executor for potentially blocking I/O operations, like state disposal */
 	private final Executor executor;
@@ -149,7 +148,7 @@ public class PendingCheckpoint {
 		this.operatorStates = new HashMap<>();
 		this.masterState = new ArrayList<>();
 		this.acknowledgedTasks = new HashSet<>(verticesToConfirm.size());
-		this.onCompletionPromise = new FlinkCompletableFuture<>();
+		this.onCompletionPromise = new CompletableFuture<>();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -249,7 +248,7 @@ public class PendingCheckpoint {
 	 *
 	 * @return A future to the completed checkpoint
 	 */
-	public Future<CompletedCheckpoint> getCompletionFuture() {
+	public CompletableFuture<CompletedCheckpoint> getCompletionFuture() {
 		return onCompletionPromise;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 3128cfc..a6712ad 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -21,7 +21,8 @@ package org.apache.flink.runtime.jobmanager
 import java.io.IOException
 import java.net._
 import java.util.UUID
-import java.util.concurrent.{TimeUnit, Future => _, TimeoutException => _, _}
+import java.util.concurrent.{Future => JavaFuture, _}
+import java.util.function.BiFunction
 
 import akka.actor.Status.{Failure, Success}
 import akka.actor._
@@ -38,13 +39,13 @@ import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.{BlobServer, BlobStore}
 import org.apache.flink.runtime.checkpoint._
-import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, SavepointStore}
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.client._
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.messages._
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.concurrent.{AcceptFunction, ApplyFunction, BiFunction, Executors => FlinkExecutors}
+import org.apache.flink.runtime.concurrent.{AcceptFunction, ApplyFunction, Executors => FlinkExecutors}
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -58,7 +59,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkSchedule
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway
 import org.apache.flink.runtime.jobmaster.JobMaster
 import org.apache.flink.runtime.leaderelection.{LeaderContender, LeaderElectionService}
-import org.apache.flink.runtime.jobmaster.JobMaster.{ARCHIVE_NAME, JOB_MANAGER_NAME}
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -80,12 +80,11 @@ import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
 import org.apache.flink.runtime.taskexecutor.TaskExecutor
-import org.apache.flink.runtime.taskexecutor.TaskExecutor.TASK_MANAGER_NAME
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
-import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils}
+import org.apache.flink.util.{InstantiationUtil, NetUtils}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable

http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 186a819..e78152a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -76,6 +75,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -1446,7 +1446,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		// trigger the first checkpoint. this should succeed
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
-		Future<CompletedCheckpoint> savepointFuture = coord.triggerSavepoint(timestamp, savepointDir);
+		CompletableFuture<CompletedCheckpoint> savepointFuture = coord.triggerSavepoint(timestamp, savepointDir);
 		assertFalse(savepointFuture.isDone());
 
 		// validate that we have a pending savepoint
@@ -1601,7 +1601,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
 		// Trigger savepoint and checkpoint
-		Future<CompletedCheckpoint> savepointFuture1 = coord.triggerSavepoint(timestamp, savepointDir);
+		CompletableFuture<CompletedCheckpoint> savepointFuture1 = coord.triggerSavepoint(timestamp, savepointDir);
 		long savepointId1 = counter.getLast();
 		assertEquals(1, coord.getNumberOfPendingCheckpoints());
 
@@ -1626,7 +1626,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		long checkpointId3 = counter.getLast();
 		assertEquals(2, coord.getNumberOfPendingCheckpoints());
 
-		Future<CompletedCheckpoint> savepointFuture2 = coord.triggerSavepoint(timestamp + 4, savepointDir);
+		CompletableFuture<CompletedCheckpoint> savepointFuture2 = coord.triggerSavepoint(timestamp + 4, savepointDir);
 		long savepointId2 = counter.getLast();
 		assertEquals(3, coord.getNumberOfPendingCheckpoints());
 
@@ -1911,7 +1911,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			null,
 			Executors.directExecutor());
 
-		List<Future<CompletedCheckpoint>> savepointFutures = new ArrayList<>();
+		List<CompletableFuture<CompletedCheckpoint>> savepointFutures = new ArrayList<>();
 
 		int numSavepoints = 5;
 
@@ -1923,7 +1923,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		}
 
 		// After triggering multiple savepoints, all should in progress
-		for (Future<CompletedCheckpoint> savepointFuture : savepointFutures) {
+		for (CompletableFuture<CompletedCheckpoint> savepointFuture : savepointFutures) {
 			assertFalse(savepointFuture.isDone());
 		}
 
@@ -1934,7 +1934,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		}
 
 		// After ACKs, all should be completed
-		for (Future<CompletedCheckpoint> savepointFuture : savepointFutures) {
+		for (CompletableFuture<CompletedCheckpoint> savepointFuture : savepointFutures) {
 			assertTrue(savepointFuture.isDone());
 		}
 	}
@@ -1966,10 +1966,10 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
-		Future<CompletedCheckpoint> savepoint0 = coord.triggerSavepoint(0, savepointDir);
+		CompletableFuture<CompletedCheckpoint> savepoint0 = coord.triggerSavepoint(0, savepointDir);
 		assertFalse("Did not trigger savepoint", savepoint0.isDone());
 
-		Future<CompletedCheckpoint> savepoint1 = coord.triggerSavepoint(1, savepointDir);
+		CompletableFuture<CompletedCheckpoint> savepoint1 = coord.triggerSavepoint(1, savepointDir);
 		assertFalse("Did not trigger savepoint", savepoint1.isDone());
 	}
 
@@ -3600,7 +3600,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		assertTrue(1 == completedCheckpointStore.getNumberOfRetainedCheckpoints());
 
 		// trigger a savepoint --> this should not have any effect on the CompletedCheckpointStore
-		Future<CompletedCheckpoint> savepointFuture = checkpointCoordinator.triggerSavepoint(savepointTimestamp, savepointDir);
+		CompletableFuture<CompletedCheckpoint> savepointFuture = checkpointCoordinator.triggerSavepoint(savepointTimestamp, savepointDir);
 
 		checkpointCoordinator.receiveAcknowledgeMessage(
 			new AcknowledgeCheckpoint(

http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index a96b597..7d103d0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -40,6 +39,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 
@@ -134,7 +134,7 @@ public class PendingCheckpointTest {
 
 		// Abort declined
 		PendingCheckpoint pending = createPendingCheckpoint(props, "ignored");
-		Future<CompletedCheckpoint> future = pending.getCompletionFuture();
+		CompletableFuture<CompletedCheckpoint> future = pending.getCompletionFuture();
 
 		assertFalse(future.isDone());
 		pending.abortDeclined();

http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 5fb9ddf..e209608 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.jobmanager
 
+import java.util.concurrent.CompletableFuture
+
 import akka.actor.ActorSystem
 import akka.testkit.{ImplicitSender, TestKit}
 import akka.util.Timeout
@@ -25,7 +27,6 @@ import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.checkpoint.{CheckpointCoordinator, CompletedCheckpoint}
 import org.apache.flink.runtime.client.JobExecutionException
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType
 import org.apache.flink.runtime.jobgraph.tasks.{ExternalizedCheckpointSettings, JobCheckpointingSettings}
 import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex, ScheduleMode}
@@ -913,7 +914,7 @@ class JobManagerITCase(_system: ActorSystem)
           doThrow(new Exception("Expected Test Exception"))
             .when(checkpointCoordinator)
             .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())
-          val savepointPathPromise = new FlinkCompletableFuture[CompletedCheckpoint]()
+          val savepointPathPromise = new CompletableFuture[CompletedCheckpoint]()
           doReturn(savepointPathPromise)
             .when(checkpointCoordinator)
             .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())
@@ -982,7 +983,7 @@ class JobManagerITCase(_system: ActorSystem)
             .when(checkpointCoordinator)
             .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())
 
-          val savepointPromise = new FlinkCompletableFuture[CompletedCheckpoint]()
+          val savepointPromise = new CompletableFuture[CompletedCheckpoint]()
           doReturn(savepointPromise)
             .when(checkpointCoordinator)
             .triggerSavepoint(org.mockito.Matchers.anyLong(), org.mockito.Matchers.anyString())

http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 3d3af95..cd88133 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.testingUtils
 
+import java.util.function.BiFunction
+
 import akka.actor.{ActorRef, Cancellable, Terminated}
 import akka.pattern.{ask, pipe}
 import org.apache.flink.api.common.JobID
@@ -25,7 +27,6 @@ import org.apache.flink.runtime.FlinkActor
 import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
-import org.apache.flink.runtime.concurrent.BiFunction
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.JobManager