You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2022/07/26 10:59:55 UTC

[flink] 01/02: [FLINK-28644][datastream] Add DataStream#collectAsync

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1b8e776a915fd243e9088fb05be603b446cc663d
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Jul 6 12:00:17 2022 +0200

    [FLINK-28644][datastream] Add DataStream#collectAsync
---
 docs/content.zh/docs/dev/datastream/overview.md    |  9 +--
 docs/content/docs/dev/datastream/overview.md       |  9 +--
 ...st_stream_execution_environment_completeness.py |  3 +-
 .../flink/streaming/api/datastream/DataStream.java | 84 +++++++++++++++++++++-
 .../environment/StreamExecutionEnvironment.java    | 10 +++
 .../flink/streaming/api/scala/DataStream.scala     | 38 +++++++++-
 .../scala/StreamingScalaAPICompletenessTest.scala  |  2 +
 .../datastream/DataStreamCollectTestITCase.java    | 73 +++++++++++++++++++
 8 files changed, 209 insertions(+), 19 deletions(-)

diff --git a/docs/content.zh/docs/dev/datastream/overview.md b/docs/content.zh/docs/dev/datastream/overview.md
index 71da0af1ab2..2bf572edb45 100644
--- a/docs/content.zh/docs/dev/datastream/overview.md
+++ b/docs/content.zh/docs/dev/datastream/overview.md
@@ -658,21 +658,16 @@ Flink 还提供了一个 sink 来收集 DataStream 的结果,它用于测试
 {{< tab "Java" >}}
 
 ```java
-import org.apache.flink.streaming.experimental.DataStreamUtils
-
 DataStream<Tuple2<String, Integer>> myResult = ...
-Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)
+Iterator<Tuple2<String, Integer>> myOutput = myResult.collectAsync();
 ```
 
 {{< /tab >}}
 {{< tab "Scala" >}}
 
 ```scala
-import org.apache.flink.streaming.experimental.DataStreamUtils
-import scala.collection.JavaConverters.asScalaIteratorConverter
-
 val myResult: DataStream[(String, Int)] = ...
-val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStream).asScala
+val myOutput: Iterator[(String, Int)] = myResult.collectAsync()
 ```
 {{< /tab >}}
 {{< /tabs >}}
diff --git a/docs/content/docs/dev/datastream/overview.md b/docs/content/docs/dev/datastream/overview.md
index 1d36134b0c3..6b9d8f80713 100644
--- a/docs/content/docs/dev/datastream/overview.md
+++ b/docs/content/docs/dev/datastream/overview.md
@@ -777,21 +777,16 @@ Flink also provides a sink to collect DataStream results for testing and debuggi
 {{< tabs "125e228e-13b5-4c77-93a7-c0f436fcdd2f" >}}
 {{< tab "Java" >}}
 ```java
-import org.apache.flink.streaming.experimental.DataStreamUtils;
-
 DataStream<Tuple2<String, Integer>> myResult = ...;
-Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult);
+Iterator<Tuple2<String, Integer>> myOutput = myResult.collectAsync();
 ```
 
 {{< /tab >}}
 {{< tab "Scala" >}}
 
 ```scala
-import org.apache.flink.streaming.experimental.DataStreamUtils
-import scala.collection.JavaConverters.asScalaIteratorConverter
-
 val myResult: DataStream[(String, Int)] = ...
-val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStream).asScala
+val myOutput: Iterator[(String, Int)] = myResult.collectAsync()
 ```
 {{< /tab >}}
 {{< /tabs >}}
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
index 2db58fc414c..7fcefe5d21e 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
@@ -49,7 +49,8 @@ class StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
                 'socketTextStream', 'initializeContextEnvironment', 'readTextFile',
                 'setNumberOfExecutionRetries', 'executeAsync', 'registerJobListener',
                 'clearJobListeners', 'getJobListeners', 'fromSequence', 'getConfiguration',
-                'generateStreamGraph', 'getTransformations', 'areExplicitEnvironmentsAllowed'}
+                'generateStreamGraph', 'getTransformations', 'areExplicitEnvironmentsAllowed',
+                'registerCollectIterator'}
 
 
 if __name__ == '__main__':
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index b0c3d649665..d2d28b49d6b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
@@ -1368,7 +1369,48 @@ public class DataStream<T> {
         }
     }
 
-    ClientAndIterator<T> executeAndCollectWithClient(String jobExecutionName) throws Exception {
+    /**
+     * Sets up the collection of the elements in this {@link DataStream}, and returns an iterator
+     * over the collected elements that can be used to retrieve elements once the job execution has
+     * started.
+     *
+     * <p>Caution: When multiple streams are being collected it is recommended to consume all
+     * streams in parallel to not back-pressure the job.
+     *
+     * <p>Caution: Closing the returned iterator cancels the job! It is recommended to close all
+     * iterators once you are no longer interested in any of the collected streams.
+     *
+     * <p>This method is functionally equivalent to {@link #collectAsync(Collector)}.
+     *
+     * @return iterator over the contained elements
+     */
+    @Experimental
+    public CloseableIterator<T> collectAsync() {
+        final Collector<T> collector = new Collector<>();
+        collectAsync(collector);
+        return collector.getOutput();
+    }
+
+    /**
+     * Sets up the collection of the elements in this {@link DataStream}, which can be retrieved
+     * later via the given {@link Collector}.
+     *
+     * <p>Caution: When multiple streams are being collected it is recommended to consume all
+     * streams in parallel to not back-pressure the job.
+     *
+     * <p>Caution: Closing the iterator from the collector cancels the job! It is recommended to
+     * close all iterators once you are no longer interested in any of the collected streams.
+     *
+     * <p>This method is functionally equivalent to {@link #collectAsync()}.
+     *
+     * <p>This method is meant to support use-cases where the application of a sink is done via a
+     * {@code Consumer<DataStream<T>>}, where it wouldn't be possible (or inconvenient) to return an
+     * iterator.
+     *
+     * @param collector a collector that can be used to retrieve the elements
+     */
+    @Experimental
+    public void collectAsync(Collector<T> collector) {
         TypeSerializer<T> serializer =
                 getType().createSerializer(getExecutionEnvironment().getConfig());
         String accumulatorName = "dataStreamCollect_" + UUID.randomUUID().toString();
@@ -1387,8 +1429,44 @@ public class DataStream<T> {
         sink.name("Data stream collect sink");
         env.addOperator(sink.getTransformation());
 
-        final JobClient jobClient = env.executeAsync(jobExecutionName);
-        iterator.setJobClient(jobClient);
+        env.registerCollectIterator(iterator);
+        collector.setIterator(iterator);
+    }
+
+    /**
+     * This class acts as an accessor to elements collected via {@link #collectAsync(Collector)}.
+     *
+     * @param <T> the element type
+     */
+    @Experimental
+    public static class Collector<T> {
+        private CloseableIterator<T> iterator;
+
+        @Internal
+        void setIterator(CloseableIterator<T> iterator) {
+            this.iterator = iterator;
+        }
+
+        /**
+         * Returns an iterator over the collected elements. The returned iterator must only be used
+         * once the job execution was triggered.
+         *
+         * <p>This method will always return the same iterator instance.
+         *
+         * @return iterator over collected elements
+         */
+        public CloseableIterator<T> getOutput() {
+            // we intentionally fail here instead of waiting, because it indicates a
+            // misunderstanding on the user and would usually just block the application
+            Preconditions.checkNotNull(iterator, "The job execution was not yet started.");
+            return iterator;
+        }
+    }
+
+    ClientAndIterator<T> executeAndCollectWithClient(String jobExecutionName) throws Exception {
+        final CloseableIterator<T> iterator = collectAsync();
+
+        final JobClient jobClient = getExecutionEnvironment().executeAsync(jobExecutionName);
 
         return new ClientAndIterator<>(jobClient, iterator);
     }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 0aadce4615d..e298905a566 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -97,6 +97,7 @@ import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
 import org.apache.flink.util.DynamicCodeLoadingException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -143,6 +144,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Public
 public class StreamExecutionEnvironment {
 
+    private static final List<CollectResultIterator<?>> collectIterators = new ArrayList<>();
+
+    @Internal
+    public void registerCollectIterator(CollectResultIterator<?> iterator) {
+        collectIterators.add(iterator);
+    }
+
     /**
      * The default name to use for a streaming job if no other name has been specified.
      *
@@ -2168,6 +2176,8 @@ public class StreamExecutionEnvironment {
         try {
             JobClient jobClient = jobClientFuture.get();
             jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
+            collectIterators.forEach(iterator -> iterator.setJobClient(jobClient));
+            collectIterators.clear();
             return jobClient;
         } catch (ExecutionException executionException) {
             final Throwable strippedException =
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 2f45948e9fe..e0f05a2cb64 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.streaming.api.scala
 
-import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
+import org.apache.flink.annotation.{Experimental, Internal, Public, PublicEvolving}
 import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.eventtime.{TimestampAssigner, WatermarkGenerator, WatermarkStrategy}
 import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner}
@@ -1202,6 +1202,42 @@ class DataStream[T](stream: JavaStream[T]) {
   def executeAndCollect(jobExecutionName: String, limit: Int): List[T] =
     stream.executeAndCollect(jobExecutionName, limit).asScala.toList
 
+  /**
+   * Sets up the collection of the elements in this [[DataStream]], and returns an iterator over the
+   * collected elements that can be used to retrieve elements once the job execution has started.
+   *
+   * <p>Caution: When multiple streams are being collected it is recommended to consume all streams
+   * in parallel to not back-pressure the job.
+   *
+   * <p>Caution: Closing the returned iterator cancels the job! It is recommended to close all
+   * iterators once you are no longer interested in any of the collected streams.
+   *
+   * @return
+   *   iterator over the contained elements
+   */
+  @Experimental
+  def collectAsync(): CloseableIterator[T] = CloseableIterator.fromJava(stream.collectAsync())
+
+  /**
+   * Sets up the collection of the elements in this [[DataStream]], which can be retrieved later via
+   * the given [[Collector]].
+   *
+   * <p>Caution: When multiple streams are being collected it is recommended to consume all streams
+   * in parallel to not back-pressure the job.
+   *
+   * <p>Caution: Closing the iterator from the collector cancels the job! It is recommended to close
+   * all iterators once you are no longer interested in any of the collected streams.
+   *
+   * <p>This method is meant to support use-cases where the application of a sink is done via a
+   * [[java.util.function.Consumer]], where it wouldn't be possible (or inconvenient) to return an
+   * iterator.
+   *
+   * @param collector
+   *   a collector that can be used to retrieve the elements
+   */
+  @Experimental
+  def collectAsync(collector: JavaStream.Collector[T]) = stream.collectAsync(collector)
+
   /**
    * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning is
    * not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
index 804061090ca..35b205f2b74 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingScalaAPICompletenessTest.scala
@@ -63,6 +63,8 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
       "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTransformations",
       "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment" +
         ".areExplicitEnvironmentsAllowed",
+      "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment" +
+        ".registerCollectIterator",
 
       // TypeHints are only needed for Java API, Scala API doesn't need them
       "org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator.returns",
diff --git a/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamCollectTestITCase.java b/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamCollectTestITCase.java
index 575c8e7bf2d..5157f418869 100644
--- a/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamCollectTestITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamCollectTestITCase.java
@@ -31,6 +31,9 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.List;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Tests for {@code DataStream} collect methods.
@@ -107,4 +110,74 @@ public class DataStreamCollectTestITCase extends TestLogger {
                 1,
                 results.size());
     }
+
+    @Test
+    public void testAsyncCollect() throws Exception {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        final DataStream<Integer> stream1 = env.fromElements(1, 2, 3, 4, 5);
+        final DataStream<Integer> stream2 = env.fromElements(6, 7, 8, 9, 10);
+
+        try (final CloseableIterator<Integer> iterator1 = stream1.collectAsync();
+                final CloseableIterator<Integer> iterator2 = stream2.collectAsync()) {
+            env.executeAsync();
+
+            for (int x = 1; x < 6; x++) {
+                assertThat(iterator1.hasNext()).isTrue();
+                assertThat(iterator1.next()).isEqualTo(x);
+            }
+
+            for (int x = 6; x < 11; x++) {
+                assertThat(iterator2.hasNext()).isTrue();
+                assertThat(iterator2.next()).isEqualTo(x);
+            }
+        }
+    }
+
+    @Test
+    public void testAsyncCollectWithCollector() throws Exception {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        final DataStream.Collector<Integer> collector1 = new DataStream.Collector<>();
+        final DataStream.Collector<Integer> collector2 = new DataStream.Collector<>();
+
+        defineWorkflowAndApplySink(
+                env,
+                stream -> stream.collectAsync(collector1),
+                stream -> stream.collectAsync(collector2));
+
+        try (final CloseableIterator<Integer> iterator1 = collector1.getOutput();
+                final CloseableIterator<Integer> iterator2 = collector2.getOutput()) {
+            env.executeAsync();
+
+            for (int x = 1; x < 6; x++) {
+                assertThat(iterator1.hasNext()).isTrue();
+                assertThat(iterator1.next()).isEqualTo(x);
+            }
+
+            for (int x = 6; x < 11; x++) {
+                assertThat(iterator2.hasNext()).isTrue();
+                assertThat(iterator2.next()).isEqualTo(x);
+            }
+        }
+    }
+
+    /**
+     * This method, while looking odd, was intentionally added to show-case what use-case {@link
+     * DataStream#collectAsync(DataStream.Collector)} serves (w.r.t. the Consumer).
+     *
+     * <p>If whatever refactoring you're thinking of doesn't support this method in a convenient way
+     * then you should reconsider it.
+     */
+    private static void defineWorkflowAndApplySink(
+            StreamExecutionEnvironment env,
+            Consumer<DataStream<Integer>> sink1Applier,
+            Consumer<DataStream<Integer>> sink2Applier) {
+
+        final DataStream<Integer> stream1 = env.fromElements(1, 2, 3, 4, 5);
+        final DataStream<Integer> stream2 = env.fromElements(6, 7, 8, 9, 10);
+
+        sink1Applier.accept(stream1);
+        sink2Applier.accept(stream2);
+    }
 }