You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/17 10:47:16 UTC

[flink] branch master updated: [FLINK-11069][core] Merge FutureUtil into FutureUtils

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 322e479  [FLINK-11069][core] Merge FutureUtil into FutureUtils
322e479 is described below

commit 322e479d540a74e45a9b5e38eb17b4eb3a4c6e9e
Author: ZILI CHEN <wa...@gmail.com>
AuthorDate: Thu Jan 17 18:47:09 2019 +0800

    [FLINK-11069][core] Merge FutureUtil into FutureUtils
---
 .../java/org/apache/flink/util/FutureUtil.java     | 85 ----------------------
 .../flink/runtime/concurrent/FutureUtils.java      | 23 ++++++
 .../org/apache/flink/runtime/state/StateUtil.java  |  4 +-
 .../partition/PipelinedSubpartitionTest.java       | 19 +++--
 .../partition/consumer/LocalInputChannelTest.java  | 33 +++++----
 .../runtime/state/MemoryStateBackendTest.java      |  4 +-
 .../runtime/state/OperatorStateBackendTest.java    | 16 ++--
 .../streaming/state/RocksDBAsyncSnapshotTest.java  |  4 +-
 .../api/operators/OperatorSnapshotFinalizer.java   | 10 +--
 9 files changed, 74 insertions(+), 124 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/FutureUtil.java b/flink-core/src/main/java/org/apache/flink/util/FutureUtil.java
deleted file mode 100644
index 8d196a5..0000000
--- a/flink-core/src/main/java/org/apache/flink/util/FutureUtil.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.util;
-
-import org.apache.flink.annotation.Internal;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Simple utility class to work with Java's Futures.
- */
-@Internal
-public class FutureUtil {
-
-	private FutureUtil() {
-		throw new AssertionError();
-	}
-
-	public static <T> T runIfNotDoneAndGet(RunnableFuture<T> future) throws ExecutionException, InterruptedException {
-
-		if (null == future) {
-			return null;
-		}
-
-		if (!future.isDone()) {
-			future.run();
-		}
-
-		return future.get();
-	}
-
-	public static void waitForAll(long timeoutMillis, Future<?>...futures) throws Exception {
-		waitForAll(timeoutMillis, Arrays.asList(futures));
-	}
-
-	public static void waitForAll(long timeoutMillis, Collection<Future<?>> futures) throws Exception {
-		long startMillis = System.currentTimeMillis();
-		Set<Future<?>> futuresSet = new HashSet<>();
-		futuresSet.addAll(futures);
-
-		while (System.currentTimeMillis() < startMillis + timeoutMillis) {
-			if (futuresSet.isEmpty()) {
-				return;
-			}
-			Iterator<Future<?>> futureIterator = futuresSet.iterator();
-			while (futureIterator.hasNext()) {
-				Future<?> future = futureIterator.next();
-				if (future.isDone()) {
-					future.get();
-					futureIterator.remove();
-				}
-			}
-
-			Thread.sleep(10);
-		}
-
-		if (!futuresSet.isEmpty()) {
-			throw new TimeoutException(String.format("Some of the futures have not finished [%s]", futuresSet));
-		}
-	}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 0f36a3a..4f5f2f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -34,8 +34,10 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -350,6 +352,27 @@ public class FutureUtils {
 	// ------------------------------------------------------------------------
 
 	/**
+	 * Run the given {@code RunnableFuture} if it is not done, and then retrieves its result.
+	 * @param future to run if not done and get
+	 * @param <T> type of the result
+	 * @return the result after running the future
+	 * @throws ExecutionException if a problem occurred
+	 * @throws InterruptedException if the current thread has been interrupted
+	 */
+	public static <T> T runIfNotDoneAndGet(RunnableFuture<T> future) throws ExecutionException, InterruptedException {
+
+		if (null == future) {
+			return null;
+		}
+
+		if (!future.isDone()) {
+			future.run();
+		}
+
+		return future.get();
+	}
+
+	/**
 	 * Run the given action after the completion of the given future. The given future can be
 	 * completed normally or exceptionally. In case of an exceptional completion the, the
 	 * action's exception will be added to the initial exception.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
index f129c31..598afe0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.util.FutureUtil;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.util.LambdaUtil;
 
 import org.slf4j.Logger;
@@ -73,7 +73,7 @@ public class StateUtil {
 
 				try {
 					// We attempt to get a result, in case the future completed before cancellation.
-					StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture);
+					StateObject stateObject = FutureUtils.runIfNotDoneAndGet(stateFuture);
 
 					if (null != stateObject) {
 						stateObject.discardState();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 82f61ab..2bb3657 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
@@ -30,18 +31,21 @@ import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
+import org.apache.flink.util.function.CheckedSupplier;
 
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
-import static org.apache.flink.util.FutureUtil.waitForAll;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -199,15 +203,18 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
 		final PipelinedSubpartition subpartition = createSubpartition();
 
+		TestSubpartitionProducer producer = new TestSubpartitionProducer(subpartition, isSlowProducer, producerSource);
 		TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(isSlowConsumer, consumerCallback);
 		final PipelinedSubpartitionView view = subpartition.createReadView(consumer);
 		consumer.setSubpartitionView(view);
 
-		Future<Boolean> producerResult = executorService.submit(
-			new TestSubpartitionProducer(subpartition, isSlowProducer, producerSource));
-		Future<Boolean> consumerResult = executorService.submit(consumer);
+		CompletableFuture<Boolean> producerResult = CompletableFuture.supplyAsync(
+			CheckedSupplier.unchecked(producer::call), executorService);
+		CompletableFuture<Boolean> consumerResult = CompletableFuture.supplyAsync(
+			CheckedSupplier.unchecked(consumer::call), executorService);
 
-		waitForAll(60_000L, producerResult, consumerResult);
+		FutureUtils.waitForAll(Arrays.asList(producerResult, consumerResult))
+			.get(60_000L, TimeUnit.MILLISECONDS);
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 448e989..0d82bff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -42,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.function.CheckedSupplier;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
@@ -57,13 +59,13 @@ import java.util.Optional;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import scala.Tuple2;
 
-import static org.apache.flink.util.FutureUtil.waitForAll;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
@@ -158,27 +160,30 @@ public class LocalInputChannelTest {
 		// Test
 		try {
 			// Submit producer tasks
-			List<Future<?>> results = Lists.newArrayListWithCapacity(
+			List<CompletableFuture<?>> results = Lists.newArrayListWithCapacity(
 				parallelism + 1);
 
 			for (int i = 0; i < parallelism; i++) {
-				results.add(executor.submit(partitionProducers[i]));
+				results.add(CompletableFuture.supplyAsync(
+					CheckedSupplier.unchecked(partitionProducers[i]::call), executor));
 			}
 
 			// Submit consumer
 			for (int i = 0; i < parallelism; i++) {
-				results.add(executor.submit(
-					new TestLocalInputChannelConsumer(
-						i,
-						parallelism,
-						numberOfBuffersPerChannel,
-						networkBuffers.createBufferPool(parallelism, parallelism),
-						partitionManager,
-						new TaskEventDispatcher(),
-						partitionIds)));
+				final TestLocalInputChannelConsumer consumer = new TestLocalInputChannelConsumer(
+					i,
+					parallelism,
+					numberOfBuffersPerChannel,
+					networkBuffers.createBufferPool(parallelism, parallelism),
+					partitionManager,
+					new TaskEventDispatcher(),
+					partitionIds);
+
+				results.add(CompletableFuture.supplyAsync(CheckedSupplier.unchecked(consumer::call), executor));
 			}
 
-			waitForAll(60_000L, results);
+			FutureUtils.waitForAll(results)
+				.get(60_000L, TimeUnit.MILLISECONDS);
 		}
 		finally {
 			networkBuffers.destroyAllBufferPools();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index d531041..3b157ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -27,13 +27,13 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FutureUtil;
 
 import org.junit.Assert;
 import org.junit.Ignore;
@@ -121,7 +121,7 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
 		RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture =
 			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
 
-		SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(runnableFuture);
+		SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(runnableFuture);
 		OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
 
 		try {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index bf2df1a..178671b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState;
@@ -43,7 +44,6 @@ import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
 import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.Preconditions;
 
 import org.junit.Assert;
@@ -252,7 +252,7 @@ public class OperatorStateBackendTest {
 		CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
 		RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture =
 			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
-		FutureUtil.runIfNotDoneAndGet(runnableFuture);
+		FutureUtils.runIfNotDoneAndGet(runnableFuture);
 
 		// make sure that the copy method has been called
 		assertTrue(copyCounter.get() > 0);
@@ -374,7 +374,7 @@ public class OperatorStateBackendTest {
 		RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot =
 				operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
 
-		SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot);
+		SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);
 		OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
 		assertNull(stateHandle);
 	}
@@ -404,7 +404,7 @@ public class OperatorStateBackendTest {
 			RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot =
 					operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
 
-			SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot);
+			SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);
 			stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
 			assertNotNull(stateHandle);
 
@@ -422,7 +422,7 @@ public class OperatorStateBackendTest {
 			expected.remove(1);
 
 			snapshot = operatorStateBackend.snapshot(1L, 1L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
-			snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot);
+			snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);
 
 			stateHandle.discardState();
 			stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
@@ -440,7 +440,7 @@ public class OperatorStateBackendTest {
 			expected.clear();
 
 			snapshot = operatorStateBackend.snapshot(2L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
-			snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot);
+			snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);
 			if (stateHandle != null) {
 				stateHandle.discardState();
 			}
@@ -509,7 +509,7 @@ public class OperatorStateBackendTest {
 		RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot =
 			operatorStateBackend.snapshot(1L, 1L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
 
-		SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot);
+		SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);
 		OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
 
 		try {
@@ -873,7 +873,7 @@ public class OperatorStateBackendTest {
 		RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture =
 			operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
 
-		SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(runnableFuture);
+		SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(runnableFuture);
 		OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
 
 		try {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index c872553..354f735 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -66,7 +67,6 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -414,7 +414,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
 				CheckpointOptions.forCheckpointWithDefaultLocation());
 
 			try {
-				FutureUtil.runIfNotDoneAndGet(snapshotFuture);
+				FutureUtils.runIfNotDoneAndGet(snapshotFuture);
 				fail("Expected an exception to be thrown here.");
 			} catch (ExecutionException e) {
 				Assert.assertEquals(testException, e.getCause());
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java
index 7a93990..5ed9f6f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java
@@ -19,10 +19,10 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.SnapshotResult;
-import org.apache.flink.util.FutureUtil;
 
 import javax.annotation.Nonnull;
 
@@ -44,16 +44,16 @@ public class OperatorSnapshotFinalizer {
 		@Nonnull OperatorSnapshotFutures snapshotFutures) throws ExecutionException, InterruptedException {
 
 		SnapshotResult<KeyedStateHandle> keyedManaged =
-			FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture());
+			FutureUtils.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture());
 
 		SnapshotResult<KeyedStateHandle> keyedRaw =
-			FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateRawFuture());
+			FutureUtils.runIfNotDoneAndGet(snapshotFutures.getKeyedStateRawFuture());
 
 		SnapshotResult<OperatorStateHandle> operatorManaged =
-			FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateManagedFuture());
+			FutureUtils.runIfNotDoneAndGet(snapshotFutures.getOperatorStateManagedFuture());
 
 		SnapshotResult<OperatorStateHandle> operatorRaw =
-			FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateRawFuture());
+			FutureUtils.runIfNotDoneAndGet(snapshotFutures.getOperatorStateRawFuture());
 
 		jobManagerOwnedState = new OperatorSubtaskState(
 			operatorManaged.getJobManagerOwnedSnapshot(),