You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/01/17 10:47:16 UTC
[flink] branch master updated: [FLINK-11069][core] Merge FutureUtil
into FutureUtils
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 322e479 [FLINK-11069][core] Merge FutureUtil into FutureUtils
322e479 is described below
commit 322e479d540a74e45a9b5e38eb17b4eb3a4c6e9e
Author: ZILI CHEN <wa...@gmail.com>
AuthorDate: Thu Jan 17 18:47:09 2019 +0800
[FLINK-11069][core] Merge FutureUtil into FutureUtils
---
.../java/org/apache/flink/util/FutureUtil.java | 85 ----------------------
.../flink/runtime/concurrent/FutureUtils.java | 23 ++++++
.../org/apache/flink/runtime/state/StateUtil.java | 4 +-
.../partition/PipelinedSubpartitionTest.java | 19 +++--
.../partition/consumer/LocalInputChannelTest.java | 33 +++++----
.../runtime/state/MemoryStateBackendTest.java | 4 +-
.../runtime/state/OperatorStateBackendTest.java | 16 ++--
.../streaming/state/RocksDBAsyncSnapshotTest.java | 4 +-
.../api/operators/OperatorSnapshotFinalizer.java | 10 +--
9 files changed, 74 insertions(+), 124 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/util/FutureUtil.java b/flink-core/src/main/java/org/apache/flink/util/FutureUtil.java
deleted file mode 100644
index 8d196a5..0000000
--- a/flink-core/src/main/java/org/apache/flink/util/FutureUtil.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.util;
-
-import org.apache.flink.annotation.Internal;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Simple utility class to work with Java's Futures.
- */
-@Internal
-public class FutureUtil {
-
- private FutureUtil() {
- throw new AssertionError();
- }
-
- public static <T> T runIfNotDoneAndGet(RunnableFuture<T> future) throws ExecutionException, InterruptedException {
-
- if (null == future) {
- return null;
- }
-
- if (!future.isDone()) {
- future.run();
- }
-
- return future.get();
- }
-
- public static void waitForAll(long timeoutMillis, Future<?>...futures) throws Exception {
- waitForAll(timeoutMillis, Arrays.asList(futures));
- }
-
- public static void waitForAll(long timeoutMillis, Collection<Future<?>> futures) throws Exception {
- long startMillis = System.currentTimeMillis();
- Set<Future<?>> futuresSet = new HashSet<>();
- futuresSet.addAll(futures);
-
- while (System.currentTimeMillis() < startMillis + timeoutMillis) {
- if (futuresSet.isEmpty()) {
- return;
- }
- Iterator<Future<?>> futureIterator = futuresSet.iterator();
- while (futureIterator.hasNext()) {
- Future<?> future = futureIterator.next();
- if (future.isDone()) {
- future.get();
- futureIterator.remove();
- }
- }
-
- Thread.sleep(10);
- }
-
- if (!futuresSet.isEmpty()) {
- throw new TimeoutException(String.format("Some of the futures have not finished [%s]", futuresSet));
- }
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 0f36a3a..4f5f2f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -34,8 +34,10 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -350,6 +352,27 @@ public class FutureUtils {
// ------------------------------------------------------------------------
/**
+ * Run the given {@code RunnableFuture} if it is not done, and then retrieves its result.
+ * @param future to run if not done and get
+ * @param <T> type of the result
+ * @return the result after running the future
+ * @throws ExecutionException if a problem occurred
+ * @throws InterruptedException if the current thread has been interrupted
+ */
+ public static <T> T runIfNotDoneAndGet(RunnableFuture<T> future) throws ExecutionException, InterruptedException {
+
+ if (null == future) {
+ return null;
+ }
+
+ if (!future.isDone()) {
+ future.run();
+ }
+
+ return future.get();
+ }
+
+ /**
* Run the given action after the completion of the given future. The given future can be
* completed normally or exceptionally. In case of an exceptional completion the, the
* action's exception will be added to the initial exception.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
index f129c31..598afe0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.state;
-import org.apache.flink.util.FutureUtil;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.util.LambdaUtil;
import org.slf4j.Logger;
@@ -73,7 +73,7 @@ public class StateUtil {
try {
// We attempt to get a result, in case the future completed before cancellation.
- StateObject stateObject = FutureUtil.runIfNotDoneAndGet(stateFuture);
+ StateObject stateObject = FutureUtils.runIfNotDoneAndGet(stateFuture);
if (null != stateObject) {
stateObject.discardState();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 82f61ab..2bb3657 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
@@ -30,18 +31,21 @@ import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
+import org.apache.flink.util.function.CheckedSupplier;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
-import static org.apache.flink.util.FutureUtil.waitForAll;
import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -199,15 +203,18 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
final PipelinedSubpartition subpartition = createSubpartition();
+ TestSubpartitionProducer producer = new TestSubpartitionProducer(subpartition, isSlowProducer, producerSource);
TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(isSlowConsumer, consumerCallback);
final PipelinedSubpartitionView view = subpartition.createReadView(consumer);
consumer.setSubpartitionView(view);
- Future<Boolean> producerResult = executorService.submit(
- new TestSubpartitionProducer(subpartition, isSlowProducer, producerSource));
- Future<Boolean> consumerResult = executorService.submit(consumer);
+ CompletableFuture<Boolean> producerResult = CompletableFuture.supplyAsync(
+ CheckedSupplier.unchecked(producer::call), executorService);
+ CompletableFuture<Boolean> consumerResult = CompletableFuture.supplyAsync(
+ CheckedSupplier.unchecked(consumer::call), executorService);
- waitForAll(60_000L, producerResult, consumerResult);
+ FutureUtils.waitForAll(Arrays.asList(producerResult, consumerResult))
+ .get(60_000L, TimeUnit.MILLISECONDS);
}
/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 448e989..0d82bff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -42,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -57,13 +59,13 @@ import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import scala.Tuple2;
-import static org.apache.flink.util.FutureUtil.waitForAll;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
@@ -158,27 +160,30 @@ public class LocalInputChannelTest {
// Test
try {
// Submit producer tasks
- List<Future<?>> results = Lists.newArrayListWithCapacity(
+ List<CompletableFuture<?>> results = Lists.newArrayListWithCapacity(
parallelism + 1);
for (int i = 0; i < parallelism; i++) {
- results.add(executor.submit(partitionProducers[i]));
+ results.add(CompletableFuture.supplyAsync(
+ CheckedSupplier.unchecked(partitionProducers[i]::call), executor));
}
// Submit consumer
for (int i = 0; i < parallelism; i++) {
- results.add(executor.submit(
- new TestLocalInputChannelConsumer(
- i,
- parallelism,
- numberOfBuffersPerChannel,
- networkBuffers.createBufferPool(parallelism, parallelism),
- partitionManager,
- new TaskEventDispatcher(),
- partitionIds)));
+ final TestLocalInputChannelConsumer consumer = new TestLocalInputChannelConsumer(
+ i,
+ parallelism,
+ numberOfBuffersPerChannel,
+ networkBuffers.createBufferPool(parallelism, parallelism),
+ partitionManager,
+ new TaskEventDispatcher(),
+ partitionIds);
+
+ results.add(CompletableFuture.supplyAsync(CheckedSupplier.unchecked(consumer::call), executor));
}
- waitForAll(60_000L, results);
+ FutureUtils.waitForAll(results)
+ .get(60_000L, TimeUnit.MILLISECONDS);
}
finally {
networkBuffers.destroyAllBufferPools();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index d531041..3b157ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -27,13 +27,13 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FutureUtil;
import org.junit.Assert;
import org.junit.Ignore;
@@ -121,7 +121,7 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack
RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture =
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
- SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(runnableFuture);
+ SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(runnableFuture);
OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
try {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index bf2df1a..178671b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState;
@@ -43,7 +44,6 @@ import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
@@ -252,7 +252,7 @@ public class OperatorStateBackendTest {
CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture =
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
- FutureUtil.runIfNotDoneAndGet(runnableFuture);
+ FutureUtils.runIfNotDoneAndGet(runnableFuture);
// make sure that the copy method has been called
assertTrue(copyCounter.get() > 0);
@@ -374,7 +374,7 @@ public class OperatorStateBackendTest {
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot =
operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
- SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot);
+ SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);
OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
assertNull(stateHandle);
}
@@ -404,7 +404,7 @@ public class OperatorStateBackendTest {
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot =
operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
- SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot);
+ SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);
stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
assertNotNull(stateHandle);
@@ -422,7 +422,7 @@ public class OperatorStateBackendTest {
expected.remove(1);
snapshot = operatorStateBackend.snapshot(1L, 1L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
- snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot);
+ snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);
stateHandle.discardState();
stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
@@ -440,7 +440,7 @@ public class OperatorStateBackendTest {
expected.clear();
snapshot = operatorStateBackend.snapshot(2L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
- snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot);
+ snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);
if (stateHandle != null) {
stateHandle.discardState();
}
@@ -509,7 +509,7 @@ public class OperatorStateBackendTest {
RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot =
operatorStateBackend.snapshot(1L, 1L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
- SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot);
+ SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(snapshot);
OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
try {
@@ -873,7 +873,7 @@ public class OperatorStateBackendTest {
RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture =
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
- SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(runnableFuture);
+ SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtils.runIfNotDoneAndGet(runnableFuture);
OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot();
try {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index c872553..354f735 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -66,7 +67,6 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.TestLogger;
@@ -414,7 +414,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
CheckpointOptions.forCheckpointWithDefaultLocation());
try {
- FutureUtil.runIfNotDoneAndGet(snapshotFuture);
+ FutureUtils.runIfNotDoneAndGet(snapshotFuture);
fail("Expected an exception to be thrown here.");
} catch (ExecutionException e) {
Assert.assertEquals(testException, e.getCause());
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java
index 7a93990..5ed9f6f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OperatorSnapshotFinalizer.java
@@ -19,10 +19,10 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
-import org.apache.flink.util.FutureUtil;
import javax.annotation.Nonnull;
@@ -44,16 +44,16 @@ public class OperatorSnapshotFinalizer {
@Nonnull OperatorSnapshotFutures snapshotFutures) throws ExecutionException, InterruptedException {
SnapshotResult<KeyedStateHandle> keyedManaged =
- FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture());
+ FutureUtils.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture());
SnapshotResult<KeyedStateHandle> keyedRaw =
- FutureUtil.runIfNotDoneAndGet(snapshotFutures.getKeyedStateRawFuture());
+ FutureUtils.runIfNotDoneAndGet(snapshotFutures.getKeyedStateRawFuture());
SnapshotResult<OperatorStateHandle> operatorManaged =
- FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateManagedFuture());
+ FutureUtils.runIfNotDoneAndGet(snapshotFutures.getOperatorStateManagedFuture());
SnapshotResult<OperatorStateHandle> operatorRaw =
- FutureUtil.runIfNotDoneAndGet(snapshotFutures.getOperatorStateRawFuture());
+ FutureUtils.runIfNotDoneAndGet(snapshotFutures.getOperatorStateRawFuture());
jobManagerOwnedState = new OperatorSubtaskState(
operatorManaged.getJobManagerOwnedSnapshot(),