You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/07/26 10:59:56 UTC

[flink] 02/02: [FLINK-28644][tests] Migrate state-processing-api to new collectAsync()

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d788f0a2af3e13ba2b56f6abd88531332cf62593
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Jul 22 15:42:52 2022 +0200

    [FLINK-28644][tests] Migrate state-processing-api to new collectAsync()
---
 .../flink/state/api/SavepointWriterITCase.java     |  52 +++-----
 .../state/api/SavepointWriterWindowITCase.java     |  82 +++++-------
 .../flink/state/api/WritableSavepointITCase.java   |  50 +++----
 .../state/api/WritableSavepointWindowITCase.java   |  86 ++++++------
 .../flink/streaming/util/StreamCollector.java      | 146 ---------------------
 .../streaming/util/StreamCollectorExtension.java   | 146 ---------------------
 6 files changed, 104 insertions(+), 458 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
index 28e9e1c3f2f..9c3febdd01e 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
@@ -27,10 +27,8 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -44,14 +42,13 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.util.StreamCollector;
+import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.SerializedThrowable;
 
 import org.junit.Assert;
-import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -60,9 +57,9 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT test for writing savepoints. */
 public class SavepointWriterITCase extends AbstractTestBase {
@@ -82,8 +79,6 @@ public class SavepointWriterITCase extends AbstractTestBase {
     private static final Collection<CurrencyRate> currencyRates =
             Arrays.asList(new CurrencyRate("USD", 1.0), new CurrencyRate("EUR", 1.3));
 
-    @Rule public StreamCollector collector = new StreamCollector();
-
     @Test
     public void testDefaultStateBackend() throws Exception {
         testStateBootstrapAndModification(null);
@@ -151,7 +146,7 @@ public class SavepointWriterITCase extends AbstractTestBase {
                         .flatMap(new UpdateAndGetAccount())
                         .uid(ACCOUNT_UID);
 
-        CompletableFuture<Collection<Account>> results = collector.collect(stream);
+        final CloseableIterator<Account> results = stream.collectAsync();
 
         env.fromCollection(currencyRates)
                 .connect(env.fromCollection(currencyRates).broadcast(descriptor))
@@ -159,22 +154,14 @@ public class SavepointWriterITCase extends AbstractTestBase {
                 .uid(CURRENCY_UID)
                 .addSink(new DiscardingSink<>());
 
-        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-        jobGraph.setSavepointRestoreSettings(
+        final StreamGraph streamGraph = env.getStreamGraph();
+        streamGraph.setSavepointRestoreSettings(
                 SavepointRestoreSettings.forPath(savepointPath, false));
 
-        ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
-        Optional<SerializedThrowable> serializedThrowable =
-                client.submitJob(jobGraph)
-                        .thenCompose(client::requestJobResult)
-                        .get()
-                        .getSerializedThrowable();
-
-        serializedThrowable.ifPresent(
-                t -> {
-                    throw new AssertionError("Unexpected exception during bootstrapping", t);
-                });
-        Assert.assertEquals("Unexpected output", 3, results.get().size());
+        env.execute(streamGraph);
+
+        assertThat(results).toIterable().hasSize(3);
+        results.close();
     }
 
     private void modifySavepoint(StateBackend backend, String savepointPath, String modifyPath)
@@ -210,26 +197,21 @@ public class SavepointWriterITCase extends AbstractTestBase {
                         .flatMap(new UpdateAndGetAccount())
                         .uid(ACCOUNT_UID);
 
-        CompletableFuture<Collection<Account>> results = collector.collect(stream);
+        final CloseableIterator<Account> results = stream.collectAsync();
 
         stream.map(acc -> acc.id)
                 .map(new StatefulOperator())
                 .uid(MODIFY_UID)
                 .addSink(new DiscardingSink<>());
 
-        JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph();
-        jobGraph.setSavepointRestoreSettings(
+        final StreamGraph streamGraph = sEnv.getStreamGraph();
+        streamGraph.setSavepointRestoreSettings(
                 SavepointRestoreSettings.forPath(savepointPath, false));
 
-        ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
-        Optional<SerializedThrowable> serializedThrowable =
-                client.submitJob(jobGraph)
-                        .thenCompose(client::requestJobResult)
-                        .get()
-                        .getSerializedThrowable();
+        sEnv.execute(streamGraph);
 
-        Assert.assertFalse(serializedThrowable.isPresent());
-        Assert.assertEquals("Unexpected output", 3, results.get().size());
+        assertThat(results).toIterable().hasSize(3);
+        results.close();
     }
 
     /** A simple pojo. */
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java
index 89e8edb7bc6..9d61ce14d9c 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java
@@ -27,9 +27,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
@@ -40,21 +38,17 @@ import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.util.StreamCollector;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.SerializedThrowable;
 
-import org.hamcrest.Matcher;
-import org.hamcrest.Matchers;
-import org.junit.Assert;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -64,9 +58,8 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT Test for writing savepoints to the {@code WindowOperator}. */
 @SuppressWarnings("unchecked")
@@ -78,13 +71,11 @@ public class SavepointWriterWindowITCase extends AbstractTestBase {
     private static final Collection<String> WORDS =
             Arrays.asList("hello", "world", "hello", "everyone");
 
-    private static final Matcher<Iterable<? extends Tuple2<String, Integer>>> STANDARD_MATCHER =
-            Matchers.containsInAnyOrder(
-                    Tuple2.of("hello", 2), Tuple2.of("world", 1), Tuple2.of("everyone", 1));
+    private static final Iterable<? extends Tuple2<String, Integer>> STANDARD_MATCHER =
+            Arrays.asList(Tuple2.of("hello", 2), Tuple2.of("world", 1), Tuple2.of("everyone", 1));
 
-    private static final Matcher<Iterable<? extends Tuple2<String, Integer>>> EVICTOR_MATCHER =
-            Matchers.containsInAnyOrder(
-                    Tuple2.of("hello", 1), Tuple2.of("world", 1), Tuple2.of("everyone", 1));
+    private static final Iterable<? extends Tuple2<String, Integer>> EVICTOR_MATCHER =
+            Arrays.asList(Tuple2.of("hello", 1), Tuple2.of("world", 1), Tuple2.of("everyone", 1));
 
     private static final TypeInformation<Tuple2<String, Integer>> TUPLE_TYPE_INFO =
             new TypeHint<Tuple2<String, Integer>>() {}.getTypeInfo();
@@ -130,8 +121,6 @@ public class SavepointWriterWindowITCase extends AbstractTestBase {
         return parameterList;
     }
 
-    @Rule public StreamCollector collector = new StreamCollector();
-
     private final WindowBootstrap windowBootstrap;
 
     private final WindowStream windowStream;
@@ -182,12 +171,14 @@ public class SavepointWriterWindowITCase extends AbstractTestBase {
                         .window(TumblingEventTimeWindows.of(Time.milliseconds(5)));
 
         DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID);
-        CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed);
+        CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync();
 
         submitJob(savepointPath, env);
 
-        Collection<Tuple2<String, Integer>> results = future.get();
-        Assert.assertThat("Incorrect results from bootstrapped windows", results, STANDARD_MATCHER);
+        assertThat(future)
+                .toIterable()
+                .as("Incorrect results from bootstrapped windows")
+                .containsAll(STANDARD_MATCHER);
     }
 
     @Test
@@ -225,12 +216,14 @@ public class SavepointWriterWindowITCase extends AbstractTestBase {
                         .evictor(CountEvictor.of(1));
 
         DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID);
-        CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed);
+        CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync();
 
         submitJob(savepointPath, env);
 
-        Collection<Tuple2<String, Integer>> results = future.get();
-        Assert.assertThat("Incorrect results from bootstrapped windows", results, EVICTOR_MATCHER);
+        assertThat(future)
+                .toIterable()
+                .as("Incorrect results from bootstrapped windows")
+                .containsAll(EVICTOR_MATCHER);
     }
 
     @Test
@@ -269,13 +262,14 @@ public class SavepointWriterWindowITCase extends AbstractTestBase {
                                         Time.milliseconds(5), Time.milliseconds(1)));
 
         DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID);
-        CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed);
+        CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync();
 
         submitJob(savepointPath, env);
 
-        Collection<Tuple2<String, Integer>> results =
-                future.get().stream().distinct().collect(Collectors.toList());
-        Assert.assertThat("Incorrect results from bootstrapped windows", results, STANDARD_MATCHER);
+        assertThat(future)
+                .toIterable()
+                .as("Incorrect results from bootstrapped windows")
+                .containsAll(STANDARD_MATCHER);
     }
 
     @Test
@@ -317,30 +311,22 @@ public class SavepointWriterWindowITCase extends AbstractTestBase {
                         .evictor(CountEvictor.of(1));
 
         DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID);
-        CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed);
+        CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync();
 
         submitJob(savepointPath, env);
 
-        Collection<Tuple2<String, Integer>> results =
-                future.get().stream().distinct().collect(Collectors.toList());
-        Assert.assertThat("Incorrect results from bootstrapped windows", results, EVICTOR_MATCHER);
+        assertThat(future)
+                .toIterable()
+                .as("Incorrect results from bootstrapped windows")
+                .containsAll(EVICTOR_MATCHER);
     }
 
-    private void submitJob(String savepointPath, StreamExecutionEnvironment sEnv) {
-        JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph();
-        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, true));
-
-        ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
-        try {
-            Optional<SerializedThrowable> serializedThrowable =
-                    client.submitJob(jobGraph)
-                            .thenCompose(client::requestJobResult)
-                            .get()
-                            .getSerializedThrowable();
-            Assert.assertFalse(serializedThrowable.isPresent());
-        } catch (Throwable t) {
-            Assert.fail("Failed to submit job");
-        }
+    private void submitJob(String savepointPath, StreamExecutionEnvironment sEnv) throws Exception {
+        StreamGraph streamGraph = sEnv.getStreamGraph();
+        streamGraph.setSavepointRestoreSettings(
+                SavepointRestoreSettings.forPath(savepointPath, true));
+
+        sEnv.execute(streamGraph);
     }
 
     private static class Reducer implements ReduceFunction<Tuple2<String, Integer>> {
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java
index c70787d0094..c881096b178 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java
@@ -28,11 +28,9 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -47,14 +45,13 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.util.StreamCollector;
+import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.SerializedThrowable;
 
 import org.junit.Assert;
-import org.junit.Rule;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -63,9 +60,9 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT test for writing savepoints. */
 public class WritableSavepointITCase extends AbstractTestBase {
@@ -86,8 +83,6 @@ public class WritableSavepointITCase extends AbstractTestBase {
     private static final Collection<CurrencyRate> currencyRates =
             Arrays.asList(new CurrencyRate("USD", 1.0), new CurrencyRate("EUR", 1.3));
 
-    @Rule public StreamCollector collector = new StreamCollector();
-
     @Test
     public void testFsStateBackend() throws Exception {
         testStateBootstrapAndModification(
@@ -161,7 +156,7 @@ public class WritableSavepointITCase extends AbstractTestBase {
                         .flatMap(new UpdateAndGetAccount())
                         .uid(ACCOUNT_UID);
 
-        CompletableFuture<Collection<Account>> results = collector.collect(stream);
+        CloseableIterator<Account> results = stream.collectAsync();
 
         sEnv.fromCollection(currencyRates)
                 .connect(sEnv.fromCollection(currencyRates).broadcast(descriptor))
@@ -169,22 +164,13 @@ public class WritableSavepointITCase extends AbstractTestBase {
                 .uid(CURRENCY_UID)
                 .addSink(new DiscardingSink<>());
 
-        JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph();
-        jobGraph.setSavepointRestoreSettings(
+        StreamGraph streamGraph = sEnv.getStreamGraph();
+        streamGraph.setSavepointRestoreSettings(
                 SavepointRestoreSettings.forPath(savepointPath, false));
 
-        ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
-        Optional<SerializedThrowable> serializedThrowable =
-                client.submitJob(jobGraph)
-                        .thenCompose(client::requestJobResult)
-                        .get()
-                        .getSerializedThrowable();
-
-        serializedThrowable.ifPresent(
-                t -> {
-                    throw new AssertionError("Unexpected exception during bootstrapping", t);
-                });
-        Assert.assertEquals("Unexpected output", 3, results.get().size());
+        sEnv.execute(streamGraph);
+
+        assertThat(results).toIterable().hasSize(3);
     }
 
     private void modifySavepoint(StateBackend backend, String savepointPath, String modifyPath)
@@ -214,26 +200,20 @@ public class WritableSavepointITCase extends AbstractTestBase {
                         .flatMap(new UpdateAndGetAccount())
                         .uid(ACCOUNT_UID);
 
-        CompletableFuture<Collection<Account>> results = collector.collect(stream);
+        CloseableIterator<Account> results = stream.collectAsync();
 
         stream.map(acc -> acc.id)
                 .map(new StatefulOperator())
                 .uid(MODIFY_UID)
                 .addSink(new DiscardingSink<>());
 
-        JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph();
-        jobGraph.setSavepointRestoreSettings(
+        StreamGraph streamGraph = sEnv.getStreamGraph();
+        streamGraph.setSavepointRestoreSettings(
                 SavepointRestoreSettings.forPath(savepointPath, false));
 
-        ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
-        Optional<SerializedThrowable> serializedThrowable =
-                client.submitJob(jobGraph)
-                        .thenCompose(client::requestJobResult)
-                        .get()
-                        .getSerializedThrowable();
+        sEnv.execute(streamGraph);
 
-        Assert.assertFalse(serializedThrowable.isPresent());
-        Assert.assertEquals("Unexpected output", 3, results.get().size());
+        assertThat(results).toIterable().hasSize(3);
     }
 
     /** A simple pojo. */
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointWindowITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointWindowITCase.java
index 7e16b891a00..4115b7645cd 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointWindowITCase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointWindowITCase.java
@@ -27,10 +27,8 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
@@ -42,21 +40,17 @@ import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.util.StreamCollector;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.SerializedThrowable;
 
-import org.hamcrest.Matcher;
-import org.hamcrest.Matchers;
-import org.junit.Assert;
-import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -64,11 +58,10 @@ import org.junit.runners.Parameterized;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT Test for writing savepoints to the {@code WindowOperator}. */
 @SuppressWarnings("unchecked")
@@ -80,13 +73,11 @@ public class WritableSavepointWindowITCase extends AbstractTestBase {
     private static final Collection<String> WORDS =
             Arrays.asList("hello", "world", "hello", "everyone");
 
-    private static final Matcher<Iterable<? extends Tuple2<String, Integer>>> STANDARD_MATCHER =
-            Matchers.containsInAnyOrder(
-                    Tuple2.of("hello", 2), Tuple2.of("world", 1), Tuple2.of("everyone", 1));
+    private static final Iterable<? extends Tuple2<String, Integer>> STANDARD_MATCHER =
+            Arrays.asList(Tuple2.of("hello", 2), Tuple2.of("world", 1), Tuple2.of("everyone", 1));
 
-    private static final Matcher<Iterable<? extends Tuple2<String, Integer>>> EVICTOR_MATCHER =
-            Matchers.containsInAnyOrder(
-                    Tuple2.of("hello", 1), Tuple2.of("world", 1), Tuple2.of("everyone", 1));
+    private static final Iterable<? extends Tuple2<String, Integer>> EVICTOR_MATCHER =
+            Arrays.asList(Tuple2.of("hello", 1), Tuple2.of("world", 1), Tuple2.of("everyone", 1));
 
     private static final TypeInformation<Tuple2<String, Integer>> TUPLE_TYPE_INFO =
             new TypeHint<Tuple2<String, Integer>>() {}.getTypeInfo();
@@ -136,8 +127,6 @@ public class WritableSavepointWindowITCase extends AbstractTestBase {
         return parameterList;
     }
 
-    @Rule public StreamCollector collector = new StreamCollector();
-
     private final WindowBootstrap windowBootstrap;
 
     private final WindowStream windowStream;
@@ -185,12 +174,14 @@ public class WritableSavepointWindowITCase extends AbstractTestBase {
                         .window(TumblingEventTimeWindows.of(Time.milliseconds(5)));
 
         DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID);
-        CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed);
+        CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync();
 
         submitJob(savepointPath, sEnv);
 
-        Collection<Tuple2<String, Integer>> results = future.get();
-        Assert.assertThat("Incorrect results from bootstrapped windows", results, STANDARD_MATCHER);
+        assertThat(future)
+                .toIterable()
+                .as("Incorrect results from bootstrapped windows")
+                .containsExactlyInAnyOrderElementsOf(STANDARD_MATCHER);
     }
 
     @Test
@@ -223,12 +214,14 @@ public class WritableSavepointWindowITCase extends AbstractTestBase {
                         .evictor(CountEvictor.of(1));
 
         DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID);
-        CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed);
+        CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync();
 
         submitJob(savepointPath, sEnv);
 
-        Collection<Tuple2<String, Integer>> results = future.get();
-        Assert.assertThat("Incorrect results from bootstrapped windows", results, EVICTOR_MATCHER);
+        assertThat(future)
+                .toIterable()
+                .as("Incorrect results from bootstrapped windows")
+                .containsExactlyInAnyOrderElementsOf(EVICTOR_MATCHER);
     }
 
     @Test
@@ -264,13 +257,16 @@ public class WritableSavepointWindowITCase extends AbstractTestBase {
                                         Time.milliseconds(5), Time.milliseconds(1)));
 
         DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID);
-        CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed);
+        CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync();
 
         submitJob(savepointPath, sEnv);
 
-        Collection<Tuple2<String, Integer>> results = future.get();
-        Assert.assertEquals("Incorrect number of results", 15, results.size());
-        Assert.assertThat("Incorrect bootstrap state", new HashSet<>(results), STANDARD_MATCHER);
+        assertThat(future)
+                .toIterable()
+                .as("Incorrect number of results")
+                .hasSize(15)
+                .as("Incorrect results from bootstrapped windows")
+                .containsAll(STANDARD_MATCHER);
     }
 
     @Test
@@ -308,30 +304,24 @@ public class WritableSavepointWindowITCase extends AbstractTestBase {
                         .evictor(CountEvictor.of(1));
 
         DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID);
-        CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed);
+        CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync();
 
         submitJob(savepointPath, sEnv);
 
-        Collection<Tuple2<String, Integer>> results = future.get();
-        Assert.assertEquals("Incorrect number of results", 15, results.size());
-        Assert.assertThat("Incorrect bootstrap state", new HashSet<>(results), EVICTOR_MATCHER);
+        assertThat(future)
+                .toIterable()
+                .as("Incorrect number of results")
+                .hasSize(15)
+                .as("Incorrect results bootstrapped windows")
+                .containsAll(EVICTOR_MATCHER);
     }
 
-    private void submitJob(String savepointPath, StreamExecutionEnvironment sEnv) {
-        JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph();
-        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, true));
-
-        ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
-        try {
-            Optional<SerializedThrowable> serializedThrowable =
-                    client.submitJob(jobGraph)
-                            .thenCompose(client::requestJobResult)
-                            .get()
-                            .getSerializedThrowable();
-            Assert.assertFalse(serializedThrowable.isPresent());
-        } catch (Throwable t) {
-            Assert.fail("Failed to submit job");
-        }
+    private void submitJob(String savepointPath, StreamExecutionEnvironment sEnv) throws Exception {
+        StreamGraph streamGraph = sEnv.getStreamGraph();
+        streamGraph.setSavepointRestoreSettings(
+                SavepointRestoreSettings.forPath(savepointPath, true));
+
+        sEnv.execute(streamGraph);
     }
 
     private static class Reducer implements ReduceFunction<Tuple2<String, Integer>> {
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollector.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollector.java
deleted file mode 100644
index 1d096521e76..00000000000
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollector.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-import org.junit.rules.ExternalResource;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * A simple utility for collecting all the elements in a {@link DataStream}.
- *
- * <pre>{@code
- * public class DataStreamTest {
- *
- * 		{@literal @}Rule
- * 		public StreamCollector collector = new StreamCollector();
- *
- * 		public void test() throws Exception {
- * 		 	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- * 		 	DataStream<Integer> stream = env.fromElements(1, 2, 3);
- *
- * 		 	CompletableFuture<Collection<Integer>> results = collector.collect(stream);
- * 		 	Assert.assertThat(results.get(), hasItems(1, 2, 3));
- * 		}
- * }
- * }</pre>
- *
- * <p><b>Note:</b> The stream collector assumes: 1) The stream is bounded. 2) All elements will fit
- * in memory. 3) All tasks run within the same JVM.
- */
-@SuppressWarnings("rawtypes")
-public class StreamCollector extends ExternalResource {
-
-    private static final AtomicLong counter = new AtomicLong();
-
-    private static final Map<Long, CountDownLatch> latches = new ConcurrentHashMap<>();
-
-    private static final Map<Long, Queue> resultQueues = new ConcurrentHashMap<>();
-
-    private List<Long> ids;
-
-    @Override
-    protected void before() {
-        ids = new ArrayList<>();
-    }
-
-    /**
-     * @return A future that contains all the elements of the DataStream which completes when all
-     *     elements have been processed.
-     */
-    public <IN> CompletableFuture<Collection<IN>> collect(DataStream<IN> stream) {
-        final long id = counter.getAndIncrement();
-        ids.add(id);
-
-        int parallelism = stream.getParallelism();
-        if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
-            parallelism = stream.getExecutionEnvironment().getParallelism();
-        }
-
-        CountDownLatch latch = new CountDownLatch(parallelism);
-        latches.put(id, latch);
-
-        Queue<IN> results = new ConcurrentLinkedDeque<>();
-        resultQueues.put(id, results);
-
-        stream.addSink(new CollectingSink<>(id));
-
-        return CompletableFuture.runAsync(
-                        () -> {
-                            try {
-                                latch.await();
-                            } catch (InterruptedException e) {
-                                throw new RuntimeException("Failed to collect results");
-                            }
-                        })
-                .thenApply(ignore -> results);
-    }
-
-    @Override
-    protected void after() {
-        for (Long id : ids) {
-            latches.remove(id);
-            resultQueues.remove(id);
-        }
-    }
-
-    private static class CollectingSink<IN> extends RichSinkFunction<IN> {
-
-        private final long id;
-
-        private transient CountDownLatch latch;
-
-        private transient Queue<IN> results;
-
-        private CollectingSink(long id) {
-            this.id = id;
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        public void open(Configuration parameters) throws Exception {
-            latch = StreamCollector.latches.get(id);
-            results = (Queue<IN>) StreamCollector.resultQueues.get(id);
-        }
-
-        @Override
-        public void invoke(IN value, Context context) throws Exception {
-            results.add(value);
-        }
-
-        @Override
-        public void close() throws Exception {
-            latch.countDown();
-        }
-    }
-}
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollectorExtension.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollectorExtension.java
deleted file mode 100644
index d7be3bf41c1..00000000000
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollectorExtension.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-import org.junit.jupiter.api.extension.AfterEachCallback;
-import org.junit.jupiter.api.extension.BeforeEachCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * A simple utility for collecting all the elements in a {@link DataStream}.
- *
- * <pre>{@code
- * public class DataStreamTest {
- *
- * 		{@literal @}RegisterExtension
- * 		public StreamCollectorExtension collector = new StreamCollectorExtension();
- *
- * 		public void test() throws Exception {
- * 		 	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- * 		 	DataStream<Integer> stream = env.fromElements(1, 2, 3);
- *
- * 		 	CompletableFuture<Collection<Integer>> results = collector.collect(stream);
- * 		 	Assert.assertThat(results.get(), hasItems(1, 2, 3));
- * 		}
- * }
- * }</pre>
- *
- * <p><b>Note:</b> The stream collector assumes: 1) The stream is bounded. 2) All elements will fit
- * in memory. 3) All tasks run within the same JVM.
- */
-public class StreamCollectorExtension implements BeforeEachCallback, AfterEachCallback {
-    private static final AtomicLong counter = new AtomicLong();
-
-    private static final Map<Long, CountDownLatch> latches = new ConcurrentHashMap<>();
-
-    private static final Map<Long, Queue> resultQueues = new ConcurrentHashMap<>();
-
-    private List<Long> ids;
-
-    @Override
-    public void beforeEach(ExtensionContext context) throws Exception {
-        ids = new ArrayList<>();
-    }
-
-    /**
-     * @return A future that contains all the elements of the DataStream which completes when all
-     *     elements have been processed.
-     */
-    public <IN> CompletableFuture<Collection<IN>> collect(DataStream<IN> stream) {
-        final long id = counter.getAndIncrement();
-        ids.add(id);
-
-        int parallelism = stream.getParallelism();
-        if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
-            parallelism = stream.getExecutionEnvironment().getParallelism();
-        }
-
-        CountDownLatch latch = new CountDownLatch(parallelism);
-        latches.put(id, latch);
-
-        Queue<IN> results = new ConcurrentLinkedDeque<>();
-        resultQueues.put(id, results);
-
-        stream.addSink(new StreamCollectorExtension.CollectingSink<>(id));
-
-        return CompletableFuture.runAsync(
-                        () -> {
-                            try {
-                                latch.await();
-                            } catch (InterruptedException e) {
-                                throw new RuntimeException("Failed to collect results");
-                            }
-                        })
-                .thenApply(ignore -> results);
-    }
-
-    @Override
-    public void afterEach(ExtensionContext context) throws Exception {
-        for (Long id : ids) {
-            latches.remove(id);
-            resultQueues.remove(id);
-        }
-    }
-
-    private static class CollectingSink<IN> extends RichSinkFunction<IN> {
-
-        private final long id;
-
-        private transient CountDownLatch latch;
-
-        private transient Queue<IN> results;
-
-        private CollectingSink(long id) {
-            this.id = id;
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        public void open(Configuration parameters) throws Exception {
-            latch = StreamCollectorExtension.latches.get(id);
-            results = (Queue<IN>) StreamCollectorExtension.resultQueues.get(id);
-        }
-
-        @Override
-        public void invoke(IN value, Context context) throws Exception {
-            results.add(value);
-        }
-
-        @Override
-        public void close() throws Exception {
-            latch.countDown();
-        }
-    }
-}