You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/26 12:11:00 UTC

[GitHub] [flink] aljoscha commented on a change in pull request #13752: [FLINK-19508][DataStream] Add collect() operation on DataStream

aljoscha commented on a change in pull request #13752:
URL: https://github.com/apache/flink/pull/13752#discussion_r511910670



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
##########
@@ -1294,6 +1303,92 @@ public ExecutionConfig getExecutionConfig() {
 		return sink;
 	}
 
+	/**
+	 * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements
+	 * of the given DataStream.
+	 *
+	 * <p>The DataStream application is executed in the regular distributed manner on the target environment,
+	 * and the events from the stream are polled back to this application process and thread through
+	 * Flink's REST API.
+	 *
+	 *<p><b>IMPORTANT</b> The returned iterator must be closed to free all cluster resources.
+	 */
+	public CloseableIterator<T> executeAndCollect() throws Exception {
+		return executeAndCollect("DataStream Collect");
+	}
+
+	/**
+	 * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements
+	 * of the given DataStream.
+	 *
+	 * <p>The DataStream application is executed in the regular distributed manner on the target environment,
+	 * and the events from the stream are polled back to this application process and thread through
+	 * Flink's REST API.
+	 *
+	 *<p><b>IMPORTANT</b> The returned iterator must be closed to free all cluster resources.
+	 */
+	public CloseableIterator<T> executeAndCollect(String jobExecutionName) throws Exception {
+		return executeAndCollectWithClient(jobExecutionName).iterator;
+	}
+
+	/**
+	 * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements
+	 * of the given DataStream.
+	 *
+	 * <p>The DataStream application is executed in the regular distributed manner on the target environment,
+	 * and the events from the stream are polled back to this application process and thread through
+	 * Flink's REST API.
+	 */
+	public List<T> executeAndCollect(int limit) throws Exception {
+		return executeAndCollect("DataStream Collect", limit);
+	}
+
+	/**
+	 * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements
+	 * of the given DataStream.
+	 *
+	 * <p>The DataStream application is executed in the regular distributed manner on the target environment,
+	 * and the events from the stream are polled back to this application process and thread through
+	 * Flink's REST API.
+	 */
+	public List<T> executeAndCollect(String jobExecutionName, int limit) throws Exception {
+		Preconditions.checkState(limit > 0, "Limit must be greater than 0");
+
+		ClientAndIterator<T> clientAndIterator = executeAndCollectWithClient(jobExecutionName);
+
+		try {
+			List<T> results = new ArrayList<>(limit);
+			while (clientAndIterator.iterator.hasNext() && limit > 0) {
+				results.add(clientAndIterator.iterator.next());
+				limit--;
+			}
+
+			return results;
+		} finally {
+			clientAndIterator.iterator.close();

Review comment:
       Makes me wonder why `ClientAndIterator` isn't `AutoCloseable` and does these two.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
##########
@@ -1294,6 +1303,92 @@ public ExecutionConfig getExecutionConfig() {
 		return sink;
 	}
 
+	/**
+	 * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements
+	 * of the given DataStream.
+	 *
+	 * <p>The DataStream application is executed in the regular distributed manner on the target environment,
+	 * and the events from the stream are polled back to this application process and thread through
+	 * Flink's REST API.
+	 *
+	 *<p><b>IMPORTANT</b> The returned iterator must be closed to free all cluster resources.
+	 */
+	public CloseableIterator<T> executeAndCollect() throws Exception {
+		return executeAndCollect("DataStream Collect");
+	}
+
+	/**
+	 * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements
+	 * of the given DataStream.
+	 *
+	 * <p>The DataStream application is executed in the regular distributed manner on the target environment,
+	 * and the events from the stream are polled back to this application process and thread through
+	 * Flink's REST API.
+	 *
+	 *<p><b>IMPORTANT</b> The returned iterator must be closed to free all cluster resources.
+	 */
+	public CloseableIterator<T> executeAndCollect(String jobExecutionName) throws Exception {
+		return executeAndCollectWithClient(jobExecutionName).iterator;
+	}
+
+	/**
+	 * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements
+	 * of the given DataStream.
+	 *
+	 * <p>The DataStream application is executed in the regular distributed manner on the target environment,
+	 * and the events from the stream are polled back to this application process and thread through
+	 * Flink's REST API.
+	 */
+	public List<T> executeAndCollect(int limit) throws Exception {
+		return executeAndCollect("DataStream Collect", limit);
+	}
+
+	/**
+	 * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements
+	 * of the given DataStream.
+	 *
+	 * <p>The DataStream application is executed in the regular distributed manner on the target environment,
+	 * and the events from the stream are polled back to this application process and thread through
+	 * Flink's REST API.
+	 */
+	public List<T> executeAndCollect(String jobExecutionName, int limit) throws Exception {
+		Preconditions.checkState(limit > 0, "Limit must be greater than 0");
+
+		ClientAndIterator<T> clientAndIterator = executeAndCollectWithClient(jobExecutionName);
+
+		try {
+			List<T> results = new ArrayList<>(limit);
+			while (clientAndIterator.iterator.hasNext() && limit > 0) {
+				results.add(clientAndIterator.iterator.next());
+				limit--;
+			}
+
+			return results;
+		} finally {
+			clientAndIterator.iterator.close();
+			clientAndIterator.client.cancel();
+		}
+	}
+
+	private ClientAndIterator<T> executeAndCollectWithClient(String jobExecutionName) throws Exception {

Review comment:
       You're duplicating the code here because we want to remove `DataStreamUtils` in the future?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org