You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/11 16:33:38 UTC

[flink] 08/10: [refactor][DataStream API] Make DataStreamUtils.collect() methods more flexible.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 09a7a66b7313fea64817fe960a8da1265b428efc
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Sep 2 15:23:27 2020 +0200

    [refactor][DataStream API] Make DataStreamUtils.collect() methods more flexible.
    
    This supports simple ways of pulling bounded streams to the client, as well as a defined number of
    elements from an unbounded stream.
---
 .../streaming/api/datastream/DataStreamUtils.java  | 133 +++++++++++++++++++--
 1 file changed, 123 insertions(+), 10 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
index 91d132c..45f4ad0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
@@ -31,9 +31,14 @@ import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.UUID;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A collection of utilities for {@link DataStream DataStreams}.
  */
@@ -41,33 +46,124 @@ import java.util.UUID;
 public final class DataStreamUtils {
 
 	/**
-	 * Returns an iterator to iterate over the elements of the DataStream.
-	 * @return The iterator
+	 * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements
+	 * of the given DataStream.
+	 *
+	 * <p>The DataStream application is executed in the regular distributed manner on the target environment,
+	 * and the events from the stream are polled back to this application process and thread through
+	 * Flink's REST API.
 	 */
 	public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) {
+		return collect(stream, "Data Stream Collect");
+	}
+
+	/**
+	 * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements
+	 * of the given DataStream.
+	 *
+	 * <p>The DataStream application is executed in the regular distributed manner on the target environment,
+	 * and the events from the stream are polled back to this application process and thread through
+	 * Flink's REST API.
+	 */
+	public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream, String executionJobName) {
+		try {
+			return collectWithClient(stream, executionJobName).iterator;
+		} catch (Exception e) {
+			// this "wrap as unchecked" step is here only to preserve the exception signature
+			// backwards compatible.
+			throw new RuntimeException("Failed to execute data stream", e);
+		}
+	}
+
+	/**
+	 * Starts the execution of the program and returns an iterator to read the result of the
+	 * given data stream, plus a {@link JobClient} to interact with the application execution.
+	 */
+	public static <OUT> ClientAndIterator<OUT> collectWithClient(
+			DataStream<OUT> stream,
+			String jobExecutionName) throws Exception {
+
 		TypeSerializer<OUT> serializer = stream.getType().createSerializer(
-			stream.getExecutionEnvironment().getConfig());
+				stream.getExecutionEnvironment().getConfig());
 		String accumulatorName = "dataStreamCollect_" + UUID.randomUUID().toString();
 
 		StreamExecutionEnvironment env = stream.getExecutionEnvironment();
 		CollectSinkOperatorFactory<OUT> factory = new CollectSinkOperatorFactory<>(serializer, accumulatorName);
 		CollectSinkOperator<OUT> operator = (CollectSinkOperator<OUT>) factory.getOperator();
 		CollectResultIterator<OUT> iterator = new CollectResultIterator<>(
-			operator.getOperatorIdFuture(), serializer, accumulatorName, env.getCheckpointConfig());
+				operator.getOperatorIdFuture(), serializer, accumulatorName, env.getCheckpointConfig());
 		CollectStreamSink<OUT> sink = new CollectStreamSink<>(stream, factory);
 		sink.name("Data stream collect sink");
 		env.addOperator(sink.getTransformation());
 
-		try {
-			JobClient jobClient = env.executeAsync("Data Stream Collect");
-			iterator.setJobClient(jobClient);
-		} catch (Exception e) {
-			throw new RuntimeException("Failed to execute data stream", e);
+		final JobClient jobClient = env.executeAsync(jobExecutionName);
+		iterator.setJobClient(jobClient);
+
+		return new ClientAndIterator<>(jobClient, iterator);
+	}
+
+	/**
+	 * Collects contents the given DataStream into a list, assuming that the stream is a bounded stream.
+	 *
+	 * <p>This method blocks until the job execution is complete. By the time the method returns, the
+	 * job will have reached its FINISHED status.
+	 *
+	 * <p>Note that if the stream is unbounded, this method will never return and might fail with an
+	 * Out-of-Memory Error because it attempts to collect an infinite stream into a list.
+	 *
+	 * @throws Exception Exceptions that occur during the execution are forwarded.
+	 */
+	public static <E> List<E> collectBoundedStream(DataStream<E> stream, String jobName) throws Exception {
+		final ArrayList<E> list = new ArrayList<>();
+		final Iterator<E> iter = collectWithClient(stream, jobName).iterator;
+		while (iter.hasNext()) {
+			list.add(iter.next());
+		}
+		list.trimToSize();
+		return list;
+	}
+
+	/**
+	 * Triggers execution of the DataStream application and collects the given number of records from the stream.
+	 * After the records are received, the execution is canceled.
+	 */
+	public static <E> List<E> collectUnboundedStream(DataStream<E> stream, int numElements, String jobName) throws Exception {
+		final ClientAndIterator<E> clientAndIterator = collectWithClient(stream, jobName);
+		final List<E> result = collectRecordsFromUnboundedStream(clientAndIterator, numElements);
+
+		// cancel the job not that we have received enough elements
+		clientAndIterator.client.cancel().get();
+
+		return result;
+	}
+
+	public static <E> List<E> collectRecordsFromUnboundedStream(
+			final ClientAndIterator<E> client,
+			final int numElements) {
+
+		checkNotNull(client, "client");
+		checkArgument(numElements > 0, "numElement must be > 0");
+
+		final ArrayList<E> result = new ArrayList<>(numElements);
+		final Iterator<E> iterator = client.iterator;
+
+		while (iterator.hasNext()) {
+			result.add(iterator.next());
+			if (result.size() == numElements) {
+				return result;
+			}
 		}
 
-		return iterator;
+		throw new IllegalArgumentException(String.format(
+				"The stream ended before reaching the requested %d records. Only %d records were received.",
+				numElements, result.size()));
 	}
 
+	// ------------------------------------------------------------------------
+	//  Deriving a KeyedStream from a stream already partitioned by key
+	//  without a shuffle
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, which extracts keys with the given
 	 * {@link KeySelector}.
@@ -129,4 +225,21 @@ public final class DataStreamUtils {
 	 * Private constructor to prevent instantiation.
 	 */
 	private DataStreamUtils() {}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A pair of an {@link Iterator} to receive results from a streaming application and a
+	 * {@link JobClient} to interact with the program.
+	 */
+	public static final class ClientAndIterator<E> {
+
+		public final JobClient client;
+		public final Iterator<E> iterator;
+
+		ClientAndIterator(JobClient client, Iterator<E> iterator) {
+			this.client = checkNotNull(client);
+			this.iterator = checkNotNull(iterator);
+		}
+	}
 }