You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/07/26 10:59:56 UTC
[flink] 02/02: [FLINK-28644][tests] Migrate state-processing-api to new collectAsync()
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d788f0a2af3e13ba2b56f6abd88531332cf62593
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Jul 22 15:42:52 2022 +0200
[FLINK-28644][tests] Migrate state-processing-api to new collectAsync()
---
.../flink/state/api/SavepointWriterITCase.java | 52 +++-----
.../state/api/SavepointWriterWindowITCase.java | 82 +++++-------
.../flink/state/api/WritableSavepointITCase.java | 50 +++----
.../state/api/WritableSavepointWindowITCase.java | 86 ++++++------
.../flink/streaming/util/StreamCollector.java | 146 ---------------------
.../streaming/util/StreamCollectorExtension.java | 146 ---------------------
6 files changed, 104 insertions(+), 458 deletions(-)
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
index 28e9e1c3f2f..9c3febdd01e 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
@@ -27,10 +27,8 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -44,14 +42,13 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.util.StreamCollector;
+import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.SerializedThrowable;
import org.junit.Assert;
-import org.junit.Rule;
import org.junit.Test;
import java.util.ArrayList;
@@ -60,9 +57,9 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
/** IT test for writing savepoints. */
public class SavepointWriterITCase extends AbstractTestBase {
@@ -82,8 +79,6 @@ public class SavepointWriterITCase extends AbstractTestBase {
private static final Collection<CurrencyRate> currencyRates =
Arrays.asList(new CurrencyRate("USD", 1.0), new CurrencyRate("EUR", 1.3));
- @Rule public StreamCollector collector = new StreamCollector();
-
@Test
public void testDefaultStateBackend() throws Exception {
testStateBootstrapAndModification(null);
@@ -151,7 +146,7 @@ public class SavepointWriterITCase extends AbstractTestBase {
.flatMap(new UpdateAndGetAccount())
.uid(ACCOUNT_UID);
- CompletableFuture<Collection<Account>> results = collector.collect(stream);
+ final CloseableIterator<Account> results = stream.collectAsync();
env.fromCollection(currencyRates)
.connect(env.fromCollection(currencyRates).broadcast(descriptor))
@@ -159,22 +154,14 @@ public class SavepointWriterITCase extends AbstractTestBase {
.uid(CURRENCY_UID)
.addSink(new DiscardingSink<>());
- JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- jobGraph.setSavepointRestoreSettings(
+ final StreamGraph streamGraph = env.getStreamGraph();
+ streamGraph.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(savepointPath, false));
- ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
- Optional<SerializedThrowable> serializedThrowable =
- client.submitJob(jobGraph)
- .thenCompose(client::requestJobResult)
- .get()
- .getSerializedThrowable();
-
- serializedThrowable.ifPresent(
- t -> {
- throw new AssertionError("Unexpected exception during bootstrapping", t);
- });
- Assert.assertEquals("Unexpected output", 3, results.get().size());
+ env.execute(streamGraph);
+
+ assertThat(results).toIterable().hasSize(3);
+ results.close();
}
private void modifySavepoint(StateBackend backend, String savepointPath, String modifyPath)
@@ -210,26 +197,21 @@ public class SavepointWriterITCase extends AbstractTestBase {
.flatMap(new UpdateAndGetAccount())
.uid(ACCOUNT_UID);
- CompletableFuture<Collection<Account>> results = collector.collect(stream);
+ final CloseableIterator<Account> results = stream.collectAsync();
stream.map(acc -> acc.id)
.map(new StatefulOperator())
.uid(MODIFY_UID)
.addSink(new DiscardingSink<>());
- JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph();
- jobGraph.setSavepointRestoreSettings(
+ final StreamGraph streamGraph = sEnv.getStreamGraph();
+ streamGraph.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(savepointPath, false));
- ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
- Optional<SerializedThrowable> serializedThrowable =
- client.submitJob(jobGraph)
- .thenCompose(client::requestJobResult)
- .get()
- .getSerializedThrowable();
+ sEnv.execute(streamGraph);
- Assert.assertFalse(serializedThrowable.isPresent());
- Assert.assertEquals("Unexpected output", 3, results.get().size());
+ assertThat(results).toIterable().hasSize(3);
+ results.close();
}
/** A simple pojo. */
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java
index 89e8edb7bc6..9d61ce14d9c 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterWindowITCase.java
@@ -27,9 +27,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
@@ -40,21 +38,17 @@ import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.util.StreamCollector;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.SerializedThrowable;
-import org.hamcrest.Matcher;
-import org.hamcrest.Matchers;
-import org.junit.Assert;
-import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -64,9 +58,8 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
/** IT Test for writing savepoints to the {@code WindowOperator}. */
@SuppressWarnings("unchecked")
@@ -78,13 +71,11 @@ public class SavepointWriterWindowITCase extends AbstractTestBase {
private static final Collection<String> WORDS =
Arrays.asList("hello", "world", "hello", "everyone");
- private static final Matcher<Iterable<? extends Tuple2<String, Integer>>> STANDARD_MATCHER =
- Matchers.containsInAnyOrder(
- Tuple2.of("hello", 2), Tuple2.of("world", 1), Tuple2.of("everyone", 1));
+ private static final Iterable<? extends Tuple2<String, Integer>> STANDARD_MATCHER =
+ Arrays.asList(Tuple2.of("hello", 2), Tuple2.of("world", 1), Tuple2.of("everyone", 1));
- private static final Matcher<Iterable<? extends Tuple2<String, Integer>>> EVICTOR_MATCHER =
- Matchers.containsInAnyOrder(
- Tuple2.of("hello", 1), Tuple2.of("world", 1), Tuple2.of("everyone", 1));
+ private static final Iterable<? extends Tuple2<String, Integer>> EVICTOR_MATCHER =
+ Arrays.asList(Tuple2.of("hello", 1), Tuple2.of("world", 1), Tuple2.of("everyone", 1));
private static final TypeInformation<Tuple2<String, Integer>> TUPLE_TYPE_INFO =
new TypeHint<Tuple2<String, Integer>>() {}.getTypeInfo();
@@ -130,8 +121,6 @@ public class SavepointWriterWindowITCase extends AbstractTestBase {
return parameterList;
}
- @Rule public StreamCollector collector = new StreamCollector();
-
private final WindowBootstrap windowBootstrap;
private final WindowStream windowStream;
@@ -182,12 +171,14 @@ public class SavepointWriterWindowITCase extends AbstractTestBase {
.window(TumblingEventTimeWindows.of(Time.milliseconds(5)));
DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID);
- CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed);
+ CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync();
submitJob(savepointPath, env);
- Collection<Tuple2<String, Integer>> results = future.get();
- Assert.assertThat("Incorrect results from bootstrapped windows", results, STANDARD_MATCHER);
+ assertThat(future)
+ .toIterable()
+ .as("Incorrect results from bootstrapped windows")
+ .containsAll(STANDARD_MATCHER);
}
@Test
@@ -225,12 +216,14 @@ public class SavepointWriterWindowITCase extends AbstractTestBase {
.evictor(CountEvictor.of(1));
DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID);
- CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed);
+ CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync();
submitJob(savepointPath, env);
- Collection<Tuple2<String, Integer>> results = future.get();
- Assert.assertThat("Incorrect results from bootstrapped windows", results, EVICTOR_MATCHER);
+ assertThat(future)
+ .toIterable()
+ .as("Incorrect results from bootstrapped windows")
+ .containsAll(EVICTOR_MATCHER);
}
@Test
@@ -269,13 +262,14 @@ public class SavepointWriterWindowITCase extends AbstractTestBase {
Time.milliseconds(5), Time.milliseconds(1)));
DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID);
- CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed);
+ CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync();
submitJob(savepointPath, env);
- Collection<Tuple2<String, Integer>> results =
- future.get().stream().distinct().collect(Collectors.toList());
- Assert.assertThat("Incorrect results from bootstrapped windows", results, STANDARD_MATCHER);
+ assertThat(future)
+ .toIterable()
+ .as("Incorrect results from bootstrapped windows")
+ .containsAll(STANDARD_MATCHER);
}
@Test
@@ -317,30 +311,22 @@ public class SavepointWriterWindowITCase extends AbstractTestBase {
.evictor(CountEvictor.of(1));
DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID);
- CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed);
+ CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync();
submitJob(savepointPath, env);
- Collection<Tuple2<String, Integer>> results =
- future.get().stream().distinct().collect(Collectors.toList());
- Assert.assertThat("Incorrect results from bootstrapped windows", results, EVICTOR_MATCHER);
+ assertThat(future)
+ .toIterable()
+ .as("Incorrect results from bootstrapped windows")
+ .containsAll(EVICTOR_MATCHER);
}
- private void submitJob(String savepointPath, StreamExecutionEnvironment sEnv) {
- JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph();
- jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, true));
-
- ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
- try {
- Optional<SerializedThrowable> serializedThrowable =
- client.submitJob(jobGraph)
- .thenCompose(client::requestJobResult)
- .get()
- .getSerializedThrowable();
- Assert.assertFalse(serializedThrowable.isPresent());
- } catch (Throwable t) {
- Assert.fail("Failed to submit job");
- }
+ private void submitJob(String savepointPath, StreamExecutionEnvironment sEnv) throws Exception {
+ StreamGraph streamGraph = sEnv.getStreamGraph();
+ streamGraph.setSavepointRestoreSettings(
+ SavepointRestoreSettings.forPath(savepointPath, true));
+
+ sEnv.execute(streamGraph);
}
private static class Reducer implements ReduceFunction<Tuple2<String, Integer>> {
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java
index c70787d0094..c881096b178 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointITCase.java
@@ -28,11 +28,9 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -47,14 +45,13 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.util.StreamCollector;
+import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.SerializedThrowable;
import org.junit.Assert;
-import org.junit.Rule;
import org.junit.Test;
import java.util.ArrayList;
@@ -63,9 +60,9 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
/** IT test for writing savepoints. */
public class WritableSavepointITCase extends AbstractTestBase {
@@ -86,8 +83,6 @@ public class WritableSavepointITCase extends AbstractTestBase {
private static final Collection<CurrencyRate> currencyRates =
Arrays.asList(new CurrencyRate("USD", 1.0), new CurrencyRate("EUR", 1.3));
- @Rule public StreamCollector collector = new StreamCollector();
-
@Test
public void testFsStateBackend() throws Exception {
testStateBootstrapAndModification(
@@ -161,7 +156,7 @@ public class WritableSavepointITCase extends AbstractTestBase {
.flatMap(new UpdateAndGetAccount())
.uid(ACCOUNT_UID);
- CompletableFuture<Collection<Account>> results = collector.collect(stream);
+ CloseableIterator<Account> results = stream.collectAsync();
sEnv.fromCollection(currencyRates)
.connect(sEnv.fromCollection(currencyRates).broadcast(descriptor))
@@ -169,22 +164,13 @@ public class WritableSavepointITCase extends AbstractTestBase {
.uid(CURRENCY_UID)
.addSink(new DiscardingSink<>());
- JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph();
- jobGraph.setSavepointRestoreSettings(
+ StreamGraph streamGraph = sEnv.getStreamGraph();
+ streamGraph.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(savepointPath, false));
- ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
- Optional<SerializedThrowable> serializedThrowable =
- client.submitJob(jobGraph)
- .thenCompose(client::requestJobResult)
- .get()
- .getSerializedThrowable();
-
- serializedThrowable.ifPresent(
- t -> {
- throw new AssertionError("Unexpected exception during bootstrapping", t);
- });
- Assert.assertEquals("Unexpected output", 3, results.get().size());
+ sEnv.execute(streamGraph);
+
+ assertThat(results).toIterable().hasSize(3);
}
private void modifySavepoint(StateBackend backend, String savepointPath, String modifyPath)
@@ -214,26 +200,20 @@ public class WritableSavepointITCase extends AbstractTestBase {
.flatMap(new UpdateAndGetAccount())
.uid(ACCOUNT_UID);
- CompletableFuture<Collection<Account>> results = collector.collect(stream);
+ CloseableIterator<Account> results = stream.collectAsync();
stream.map(acc -> acc.id)
.map(new StatefulOperator())
.uid(MODIFY_UID)
.addSink(new DiscardingSink<>());
- JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph();
- jobGraph.setSavepointRestoreSettings(
+ StreamGraph streamGraph = sEnv.getStreamGraph();
+ streamGraph.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(savepointPath, false));
- ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
- Optional<SerializedThrowable> serializedThrowable =
- client.submitJob(jobGraph)
- .thenCompose(client::requestJobResult)
- .get()
- .getSerializedThrowable();
+ sEnv.execute(streamGraph);
- Assert.assertFalse(serializedThrowable.isPresent());
- Assert.assertEquals("Unexpected output", 3, results.get().size());
+ assertThat(results).toIterable().hasSize(3);
}
/** A simple pojo. */
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointWindowITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointWindowITCase.java
index 7e16b891a00..4115b7645cd 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointWindowITCase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/WritableSavepointWindowITCase.java
@@ -27,10 +27,8 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
-import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
@@ -42,21 +40,17 @@ import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.util.StreamCollector;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.SerializedThrowable;
-import org.hamcrest.Matcher;
-import org.hamcrest.Matchers;
-import org.junit.Assert;
-import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -64,11 +58,10 @@ import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
/** IT Test for writing savepoints to the {@code WindowOperator}. */
@SuppressWarnings("unchecked")
@@ -80,13 +73,11 @@ public class WritableSavepointWindowITCase extends AbstractTestBase {
private static final Collection<String> WORDS =
Arrays.asList("hello", "world", "hello", "everyone");
- private static final Matcher<Iterable<? extends Tuple2<String, Integer>>> STANDARD_MATCHER =
- Matchers.containsInAnyOrder(
- Tuple2.of("hello", 2), Tuple2.of("world", 1), Tuple2.of("everyone", 1));
+ private static final Iterable<? extends Tuple2<String, Integer>> STANDARD_MATCHER =
+ Arrays.asList(Tuple2.of("hello", 2), Tuple2.of("world", 1), Tuple2.of("everyone", 1));
- private static final Matcher<Iterable<? extends Tuple2<String, Integer>>> EVICTOR_MATCHER =
- Matchers.containsInAnyOrder(
- Tuple2.of("hello", 1), Tuple2.of("world", 1), Tuple2.of("everyone", 1));
+ private static final Iterable<? extends Tuple2<String, Integer>> EVICTOR_MATCHER =
+ Arrays.asList(Tuple2.of("hello", 1), Tuple2.of("world", 1), Tuple2.of("everyone", 1));
private static final TypeInformation<Tuple2<String, Integer>> TUPLE_TYPE_INFO =
new TypeHint<Tuple2<String, Integer>>() {}.getTypeInfo();
@@ -136,8 +127,6 @@ public class WritableSavepointWindowITCase extends AbstractTestBase {
return parameterList;
}
- @Rule public StreamCollector collector = new StreamCollector();
-
private final WindowBootstrap windowBootstrap;
private final WindowStream windowStream;
@@ -185,12 +174,14 @@ public class WritableSavepointWindowITCase extends AbstractTestBase {
.window(TumblingEventTimeWindows.of(Time.milliseconds(5)));
DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID);
- CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed);
+ CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync();
submitJob(savepointPath, sEnv);
- Collection<Tuple2<String, Integer>> results = future.get();
- Assert.assertThat("Incorrect results from bootstrapped windows", results, STANDARD_MATCHER);
+ assertThat(future)
+ .toIterable()
+ .as("Incorrect results from bootstrapped windows")
+ .containsExactlyInAnyOrderElementsOf(STANDARD_MATCHER);
}
@Test
@@ -223,12 +214,14 @@ public class WritableSavepointWindowITCase extends AbstractTestBase {
.evictor(CountEvictor.of(1));
DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID);
- CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed);
+ CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync();
submitJob(savepointPath, sEnv);
- Collection<Tuple2<String, Integer>> results = future.get();
- Assert.assertThat("Incorrect results from bootstrapped windows", results, EVICTOR_MATCHER);
+ assertThat(future)
+ .toIterable()
+ .as("Incorrect results from bootstrapped windows")
+ .containsExactlyInAnyOrderElementsOf(EVICTOR_MATCHER);
}
@Test
@@ -264,13 +257,16 @@ public class WritableSavepointWindowITCase extends AbstractTestBase {
Time.milliseconds(5), Time.milliseconds(1)));
DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID);
- CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed);
+ CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync();
submitJob(savepointPath, sEnv);
- Collection<Tuple2<String, Integer>> results = future.get();
- Assert.assertEquals("Incorrect number of results", 15, results.size());
- Assert.assertThat("Incorrect bootstrap state", new HashSet<>(results), STANDARD_MATCHER);
+ assertThat(future)
+ .toIterable()
+ .as("Incorrect number of results")
+ .hasSize(15)
+ .as("Incorrect results from bootstrapped windows")
+ .containsAll(STANDARD_MATCHER);
}
@Test
@@ -308,30 +304,24 @@ public class WritableSavepointWindowITCase extends AbstractTestBase {
.evictor(CountEvictor.of(1));
DataStream<Tuple2<String, Integer>> windowed = windowStream.window(stream).uid(UID);
- CompletableFuture<Collection<Tuple2<String, Integer>>> future = collector.collect(windowed);
+ CloseableIterator<Tuple2<String, Integer>> future = windowed.collectAsync();
submitJob(savepointPath, sEnv);
- Collection<Tuple2<String, Integer>> results = future.get();
- Assert.assertEquals("Incorrect number of results", 15, results.size());
- Assert.assertThat("Incorrect bootstrap state", new HashSet<>(results), EVICTOR_MATCHER);
+ assertThat(future)
+ .toIterable()
+ .as("Incorrect number of results")
+ .hasSize(15)
+ .as("Incorrect results bootstrapped windows")
+ .containsAll(EVICTOR_MATCHER);
}
- private void submitJob(String savepointPath, StreamExecutionEnvironment sEnv) {
- JobGraph jobGraph = sEnv.getStreamGraph().getJobGraph();
- jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, true));
-
- ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
- try {
- Optional<SerializedThrowable> serializedThrowable =
- client.submitJob(jobGraph)
- .thenCompose(client::requestJobResult)
- .get()
- .getSerializedThrowable();
- Assert.assertFalse(serializedThrowable.isPresent());
- } catch (Throwable t) {
- Assert.fail("Failed to submit job");
- }
+ private void submitJob(String savepointPath, StreamExecutionEnvironment sEnv) throws Exception {
+ StreamGraph streamGraph = sEnv.getStreamGraph();
+ streamGraph.setSavepointRestoreSettings(
+ SavepointRestoreSettings.forPath(savepointPath, true));
+
+ sEnv.execute(streamGraph);
}
private static class Reducer implements ReduceFunction<Tuple2<String, Integer>> {
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollector.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollector.java
deleted file mode 100644
index 1d096521e76..00000000000
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollector.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-import org.junit.rules.ExternalResource;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * A simple utility for collecting all the elements in a {@link DataStream}.
- *
- * <pre>{@code
- * public class DataStreamTest {
- *
- * {@literal @}Rule
- * public StreamCollector collector = new StreamCollector();
- *
- * public void test() throws Exception {
- * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- * DataStream<Integer> stream = env.fromElements(1, 2, 3);
- *
- * CompletableFuture<Collection<Integer>> results = collector.collect(stream);
- * Assert.assertThat(results.get(), hasItems(1, 2, 3));
- * }
- * }
- * }</pre>
- *
- * <p><b>Note:</b> The stream collector assumes: 1) The stream is bounded. 2) All elements will fit
- * in memory. 3) All tasks run within the same JVM.
- */
-@SuppressWarnings("rawtypes")
-public class StreamCollector extends ExternalResource {
-
- private static final AtomicLong counter = new AtomicLong();
-
- private static final Map<Long, CountDownLatch> latches = new ConcurrentHashMap<>();
-
- private static final Map<Long, Queue> resultQueues = new ConcurrentHashMap<>();
-
- private List<Long> ids;
-
- @Override
- protected void before() {
- ids = new ArrayList<>();
- }
-
- /**
- * @return A future that contains all the elements of the DataStream which completes when all
- * elements have been processed.
- */
- public <IN> CompletableFuture<Collection<IN>> collect(DataStream<IN> stream) {
- final long id = counter.getAndIncrement();
- ids.add(id);
-
- int parallelism = stream.getParallelism();
- if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
- parallelism = stream.getExecutionEnvironment().getParallelism();
- }
-
- CountDownLatch latch = new CountDownLatch(parallelism);
- latches.put(id, latch);
-
- Queue<IN> results = new ConcurrentLinkedDeque<>();
- resultQueues.put(id, results);
-
- stream.addSink(new CollectingSink<>(id));
-
- return CompletableFuture.runAsync(
- () -> {
- try {
- latch.await();
- } catch (InterruptedException e) {
- throw new RuntimeException("Failed to collect results");
- }
- })
- .thenApply(ignore -> results);
- }
-
- @Override
- protected void after() {
- for (Long id : ids) {
- latches.remove(id);
- resultQueues.remove(id);
- }
- }
-
- private static class CollectingSink<IN> extends RichSinkFunction<IN> {
-
- private final long id;
-
- private transient CountDownLatch latch;
-
- private transient Queue<IN> results;
-
- private CollectingSink(long id) {
- this.id = id;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void open(Configuration parameters) throws Exception {
- latch = StreamCollector.latches.get(id);
- results = (Queue<IN>) StreamCollector.resultQueues.get(id);
- }
-
- @Override
- public void invoke(IN value, Context context) throws Exception {
- results.add(value);
- }
-
- @Override
- public void close() throws Exception {
- latch.countDown();
- }
- }
-}
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollectorExtension.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollectorExtension.java
deleted file mode 100644
index d7be3bf41c1..00000000000
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollectorExtension.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-import org.junit.jupiter.api.extension.AfterEachCallback;
-import org.junit.jupiter.api.extension.BeforeEachCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * A simple utility for collecting all the elements in a {@link DataStream}.
- *
- * <pre>{@code
- * public class DataStreamTest {
- *
- * {@literal @}RegisterExtension
- * public StreamCollectorExtension collector = new StreamCollectorExtension();
- *
- * public void test() throws Exception {
- * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- * DataStream<Integer> stream = env.fromElements(1, 2, 3);
- *
- * CompletableFuture<Collection<Integer>> results = collector.collect(stream);
- * Assert.assertThat(results.get(), hasItems(1, 2, 3));
- * }
- * }
- * }</pre>
- *
- * <p><b>Note:</b> The stream collector assumes: 1) The stream is bounded. 2) All elements will fit
- * in memory. 3) All tasks run within the same JVM.
- */
-public class StreamCollectorExtension implements BeforeEachCallback, AfterEachCallback {
- private static final AtomicLong counter = new AtomicLong();
-
- private static final Map<Long, CountDownLatch> latches = new ConcurrentHashMap<>();
-
- private static final Map<Long, Queue> resultQueues = new ConcurrentHashMap<>();
-
- private List<Long> ids;
-
- @Override
- public void beforeEach(ExtensionContext context) throws Exception {
- ids = new ArrayList<>();
- }
-
- /**
- * @return A future that contains all the elements of the DataStream which completes when all
- * elements have been processed.
- */
- public <IN> CompletableFuture<Collection<IN>> collect(DataStream<IN> stream) {
- final long id = counter.getAndIncrement();
- ids.add(id);
-
- int parallelism = stream.getParallelism();
- if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
- parallelism = stream.getExecutionEnvironment().getParallelism();
- }
-
- CountDownLatch latch = new CountDownLatch(parallelism);
- latches.put(id, latch);
-
- Queue<IN> results = new ConcurrentLinkedDeque<>();
- resultQueues.put(id, results);
-
- stream.addSink(new StreamCollectorExtension.CollectingSink<>(id));
-
- return CompletableFuture.runAsync(
- () -> {
- try {
- latch.await();
- } catch (InterruptedException e) {
- throw new RuntimeException("Failed to collect results");
- }
- })
- .thenApply(ignore -> results);
- }
-
- @Override
- public void afterEach(ExtensionContext context) throws Exception {
- for (Long id : ids) {
- latches.remove(id);
- resultQueues.remove(id);
- }
- }
-
- private static class CollectingSink<IN> extends RichSinkFunction<IN> {
-
- private final long id;
-
- private transient CountDownLatch latch;
-
- private transient Queue<IN> results;
-
- private CollectingSink(long id) {
- this.id = id;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void open(Configuration parameters) throws Exception {
- latch = StreamCollectorExtension.latches.get(id);
- results = (Queue<IN>) StreamCollectorExtension.resultQueues.get(id);
- }
-
- @Override
- public void invoke(IN value, Context context) throws Exception {
- results.add(value);
- }
-
- @Override
- public void close() throws Exception {
- latch.countDown();
- }
- }
-}