You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/11 16:33:38 UTC
[flink] 08/10: [refactor][DataStream API] Make
DataStreamUtils.collect() methods more flexible.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 09a7a66b7313fea64817fe960a8da1265b428efc
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Sep 2 15:23:27 2020 +0200
[refactor][DataStream API] Make DataStreamUtils.collect() methods more flexible.
This supports simple ways of pulling bounded streams to the client, as well as a defined number of
elements from an unbounded stream.
---
.../streaming/api/datastream/DataStreamUtils.java | 133 +++++++++++++++++++--
1 file changed, 123 insertions(+), 10 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
index 91d132c..45f4ad0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
@@ -31,9 +31,14 @@ import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.UUID;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A collection of utilities for {@link DataStream DataStreams}.
*/
@@ -41,33 +46,124 @@ import java.util.UUID;
public final class DataStreamUtils {
/**
- * Returns an iterator to iterate over the elements of the DataStream.
- * @return The iterator
+ * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements
+ * of the given DataStream.
+ *
+ * <p>The DataStream application is executed in the regular distributed manner on the target environment,
+ * and the events from the stream are polled back to this application process and thread through
+ * Flink's REST API.
*/
public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) {
+ return collect(stream, "Data Stream Collect");
+ }
+
+ /**
+ * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements
+ * of the given DataStream.
+ *
+ * <p>The DataStream application is executed in the regular distributed manner on the target environment,
+ * and the events from the stream are polled back to this application process and thread through
+ * Flink's REST API.
+ */
+ public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream, String executionJobName) {
+ try {
+ return collectWithClient(stream, executionJobName).iterator;
+ } catch (Exception e) {
+ // this "wrap as unchecked" step is here only to preserve the exception signature
+ // backwards compatible.
+ throw new RuntimeException("Failed to execute data stream", e);
+ }
+ }
+
+ /**
+ * Starts the execution of the program and returns an iterator to read the result of the
+ * given data stream, plus a {@link JobClient} to interact with the application execution.
+ */
+ public static <OUT> ClientAndIterator<OUT> collectWithClient(
+ DataStream<OUT> stream,
+ String jobExecutionName) throws Exception {
+
TypeSerializer<OUT> serializer = stream.getType().createSerializer(
- stream.getExecutionEnvironment().getConfig());
+ stream.getExecutionEnvironment().getConfig());
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID().toString();
StreamExecutionEnvironment env = stream.getExecutionEnvironment();
CollectSinkOperatorFactory<OUT> factory = new CollectSinkOperatorFactory<>(serializer, accumulatorName);
CollectSinkOperator<OUT> operator = (CollectSinkOperator<OUT>) factory.getOperator();
CollectResultIterator<OUT> iterator = new CollectResultIterator<>(
- operator.getOperatorIdFuture(), serializer, accumulatorName, env.getCheckpointConfig());
+ operator.getOperatorIdFuture(), serializer, accumulatorName, env.getCheckpointConfig());
CollectStreamSink<OUT> sink = new CollectStreamSink<>(stream, factory);
sink.name("Data stream collect sink");
env.addOperator(sink.getTransformation());
- try {
- JobClient jobClient = env.executeAsync("Data Stream Collect");
- iterator.setJobClient(jobClient);
- } catch (Exception e) {
- throw new RuntimeException("Failed to execute data stream", e);
+ final JobClient jobClient = env.executeAsync(jobExecutionName);
+ iterator.setJobClient(jobClient);
+
+ return new ClientAndIterator<>(jobClient, iterator);
+ }
+
+ /**
+ * Collects contents the given DataStream into a list, assuming that the stream is a bounded stream.
+ *
+ * <p>This method blocks until the job execution is complete. By the time the method returns, the
+ * job will have reached its FINISHED status.
+ *
+ * <p>Note that if the stream is unbounded, this method will never return and might fail with an
+ * Out-of-Memory Error because it attempts to collect an infinite stream into a list.
+ *
+ * @throws Exception Exceptions that occur during the execution are forwarded.
+ */
+ public static <E> List<E> collectBoundedStream(DataStream<E> stream, String jobName) throws Exception {
+ final ArrayList<E> list = new ArrayList<>();
+ final Iterator<E> iter = collectWithClient(stream, jobName).iterator;
+ while (iter.hasNext()) {
+ list.add(iter.next());
+ }
+ list.trimToSize();
+ return list;
+ }
+
+ /**
+ * Triggers execution of the DataStream application and collects the given number of records from the stream.
+ * After the records are received, the execution is canceled.
+ */
+ public static <E> List<E> collectUnboundedStream(DataStream<E> stream, int numElements, String jobName) throws Exception {
+ final ClientAndIterator<E> clientAndIterator = collectWithClient(stream, jobName);
+ final List<E> result = collectRecordsFromUnboundedStream(clientAndIterator, numElements);
+
+ // cancel the job not that we have received enough elements
+ clientAndIterator.client.cancel().get();
+
+ return result;
+ }
+
+ public static <E> List<E> collectRecordsFromUnboundedStream(
+ final ClientAndIterator<E> client,
+ final int numElements) {
+
+ checkNotNull(client, "client");
+ checkArgument(numElements > 0, "numElement must be > 0");
+
+ final ArrayList<E> result = new ArrayList<>(numElements);
+ final Iterator<E> iterator = client.iterator;
+
+ while (iterator.hasNext()) {
+ result.add(iterator.next());
+ if (result.size() == numElements) {
+ return result;
+ }
}
- return iterator;
+ throw new IllegalArgumentException(String.format(
+ "The stream ended before reaching the requested %d records. Only %d records were received.",
+ numElements, result.size()));
}
+ // ------------------------------------------------------------------------
+ // Deriving a KeyedStream from a stream already partitioned by key
+ // without a shuffle
+ // ------------------------------------------------------------------------
+
/**
* Reinterprets the given {@link DataStream} as a {@link KeyedStream}, which extracts keys with the given
* {@link KeySelector}.
@@ -129,4 +225,21 @@ public final class DataStreamUtils {
* Private constructor to prevent instantiation.
*/
private DataStreamUtils() {}
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A pair of an {@link Iterator} to receive results from a streaming application and a
+ * {@link JobClient} to interact with the program.
+ */
+ public static final class ClientAndIterator<E> {
+
+ public final JobClient client;
+ public final Iterator<E> iterator;
+
+ ClientAndIterator(JobClient client, Iterator<E> iterator) {
+ this.client = checkNotNull(client);
+ this.iterator = checkNotNull(iterator);
+ }
+ }
}