You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/10 09:18:23 UTC

[GitHub] [flink] TsReaper opened a new pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

TsReaper opened a new pull request #12867:
URL: https://github.com/apache/flink/pull/12867


   ## What is the purpose of the change
   
   Currently `TableResult#collect` and `DataStreamUtils#collect` can only produce results if users explicitly enable checkpoint for infinite streaming jobs. It would be strange to require the users to do so if they just want to take a look at their data.
   
   This PR introduces collect iterator with at least once semantics and exactly once semantics without fault tolerance. When calling the collect method, we automatically pick an iterator for the user:
   * If the user does not explicitly enable a checkpoint, we use exactly once iterator without fault tolerance. That is to say, the iterator will throw exception once the job restarts.
   * If the user explicitly enables an exactly once checkpoint, we use the current implementation of collect iterator.
   * If the user explicitly enables an at least once checkpoint, we use the at least once iterator. That is to say, the iterator ignores both checkpoint information and job restarts.
   
   ## Brief change log
   
    - Refactor tests for datastream / table collect
    - Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance
   
   ## Verifying this change
   
   This change is already covered by existing datastream / table collect tests, and this change added tests and can be verified by running those unit test cases.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? not applicable
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4552",
       "triggerID" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4556",
       "triggerID" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4560",
       "triggerID" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4571",
       "triggerID" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aa18ae06c64ff88beba4bfe780f7de03d0dc77f1 UNKNOWN
   * 577a355b33c5bdd7176f207201dc1b33d98c665a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4560) 
   * a64ca67c2e50380c4b6aca1748fb71a3c620377a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4571) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 14690b549ea2a758df20a373a9ba272be2f3ecf4 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12867:
URL: https://github.com/apache/flink/pull/12867#discussion_r456225483



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java
##########
@@ -132,10 +132,22 @@
 	 *  }
 	 * }</pre>
 	 *
-	 * <p>For streaming mode, this method guarantees end-to-end exactly-once record delivery
-	 * which requires the checkpointing mechanism to be enabled.
-	 * By default, checkpointing is disabled. To enable checkpointing, set checkpointing properties
-	 * (see ExecutionCheckpointingOptions) through {@link TableConfig#getConfiguration()}.
+	 * <p>This method has slightly different behaviors under different checkpointing settings
+	 * (to enable checkpointing for a streaming job,
+	 * set checkpointing properties through {@link TableConfig#getConfiguration()}).
+	 * <ul>
+	 *     <li>If the user is running a batch job, or does not enable checkpointing for a streaming job,
+	 *     this method has neither exactly-once nor at-least-once guarantee.
+	 *     Query results are immediately accessible by the clients once they're produced,
+	 *     but the function calls will throw an exception when the job fails and restarts.

Review comment:
       ditto

##########
File path: docs/dev/table/sql/queries.md
##########
@@ -145,21 +145,16 @@ A SELECT statement or a VALUES statement can be executed to collect the content
 `TableResult.collect()` method returns a closeable row iterator. The select job will not be finished unless all result data has been collected. We should actively close the job to avoid resource leak through the `CloseableIterator#close()` method. 
 We can also print the select result to client console through the `TableResult.print()` method. The result data in `TableResult` can be accessed only once. Thus, `collect()` and `print()` must not be called after each other.
 
-For streaming job, `TableResult.collect()` method or `TableResult.print` method guarantee end-to-end exactly-once record delivery. This requires the checkpointing mechanism to be enabled. By default, checkpointing is disabled. To enable checkpointing, we can set checkpointing properties (see the <a href="{{ site.baseurl }}/ops/config.html#checkpointing">checkpointing config</a> for details) through `TableConfig`.
-So a result record can be accessed by client only after its corresponding checkpoint completes.
-
-**Notes:** For streaming mode, only append-only query is supported now. 
+`TableResult.collect()` and `TableResult.print()` have slightly different behaviors under different checkpointing settings (to enable checkpointing for a streaming job, see <a href="{{ site.baseurl }}/ops/config.html#checkpointing">checkpointing config</a>).
+* If the user is running a batch job, or does not enable checkpointing for a streaming job, `TableResult.collect()` and `TableResult.print()` have neither exactly-once nor at-least-once guarantee. Query results are immediately accessible by the clients once they're produced, but the function calls will throw an exception when the job fails and restarts;

Review comment:
       users do not know what the `the function calls` means

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java
##########
@@ -146,10 +158,22 @@
 	/**
 	 * Print the result contents as tableau form to client console.
 	 *
-	 * <p>For streaming mode, this method guarantees end-to-end exactly-once record delivery
-	 * which requires the checkpointing mechanism to be enabled.
-	 * By default, checkpointing is disabled. To enable checkpointing, set checkpointing properties
-	 * (see ExecutionCheckpointingOptions) through {@link TableConfig#getConfiguration()}.
+	 * <p>This method has slightly different behaviors under different checkpointing settings
+	 * (to enable checkpointing for a streaming job,
+	 * set checkpointing properties through {@link TableConfig#getConfiguration()}).
+	 * <ul>
+	 *     <li>If the user is running a batch job, or does not enable checkpointing for a streaming job,
+	 *     this method has neither exactly-once nor at-least-once guarantee.
+	 *     Query results are immediately accessible by the clients once they're produced,
+	 *     but the function calls will throw an exception when the job fails and restarts.

Review comment:
       ditto

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java
##########
@@ -132,10 +132,22 @@
 	 *  }
 	 * }</pre>
 	 *
-	 * <p>For streaming mode, this method guarantees end-to-end exactly-once record delivery
-	 * which requires the checkpointing mechanism to be enabled.
-	 * By default, checkpointing is disabled. To enable checkpointing, set checkpointing properties
-	 * (see ExecutionCheckpointingOptions) through {@link TableConfig#getConfiguration()}.
+	 * <p>This method has slightly different behaviors under different checkpointing settings
+	 * (to enable checkpointing for a streaming job,
+	 * set checkpointing properties through {@link TableConfig#getConfiguration()}).
+	 * <ul>
+	 *     <li>If the user is running a batch job, or does not enable checkpointing for a streaming job,
+	 *     this method has neither exactly-once nor at-least-once guarantee.
+	 *     Query results are immediately accessible by the clients once they're produced,
+	 *     but the function calls will throw an exception when the job fails and restarts.
+	 *     <li>If the user enables exactly-once checkpointing for a streaming job,

Review comment:
       `If exactly-once checkpointing is enabled for a streaming job,` ?

##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java
##########
@@ -146,10 +158,22 @@
 	/**
 	 * Print the result contents as tableau form to client console.
 	 *
-	 * <p>For streaming mode, this method guarantees end-to-end exactly-once record delivery
-	 * which requires the checkpointing mechanism to be enabled.
-	 * By default, checkpointing is disabled. To enable checkpointing, set checkpointing properties
-	 * (see ExecutionCheckpointingOptions) through {@link TableConfig#getConfiguration()}.
+	 * <p>This method has slightly different behaviors under different checkpointing settings

Review comment:
       please update the doc of `TableResult` in flink-python




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4552",
       "triggerID" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4556",
       "triggerID" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4560",
       "triggerID" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4571",
       "triggerID" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bf49c2297508e5dde1a9e0d8d4dfbafefc5f46f2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4623",
       "triggerID" : "bf49c2297508e5dde1a9e0d8d4dfbafefc5f46f2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fb16ba0168cae5742f27c87acae3c477dd42150b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4717",
       "triggerID" : "fb16ba0168cae5742f27c87acae3c477dd42150b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a1ea81a9dd67cab4c8b483dfe3eed51d7ab4a9e2",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4748",
       "triggerID" : "a1ea81a9dd67cab4c8b483dfe3eed51d7ab4a9e2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aa18ae06c64ff88beba4bfe780f7de03d0dc77f1 UNKNOWN
   * a1ea81a9dd67cab4c8b483dfe3eed51d7ab4a9e2 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4748) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4552",
       "triggerID" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4556",
       "triggerID" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aa18ae06c64ff88beba4bfe780f7de03d0dc77f1 UNKNOWN
   * c39f4a57e2cf4383ce76795f118096a1fffcbd6d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4556) 
   * 577a355b33c5bdd7176f207201dc1b33d98c665a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
TsReaper commented on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-662810921


   Azure passed in https://dev.azure.com/tsreaper96/Flink/_build/results?buildId=51


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 14690b549ea2a758df20a373a9ba272be2f3ecf4 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396) 
   * c3ac42923fc06b5d0d86c01c814918916ab86790 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-659305960


   please add some comments for new class, e.g. `AbstractCollectResultBuffer`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4552",
       "triggerID" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4556",
       "triggerID" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4560",
       "triggerID" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4571",
       "triggerID" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bf49c2297508e5dde1a9e0d8d4dfbafefc5f46f2",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4623",
       "triggerID" : "bf49c2297508e5dde1a9e0d8d4dfbafefc5f46f2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fb16ba0168cae5742f27c87acae3c477dd42150b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "fb16ba0168cae5742f27c87acae3c477dd42150b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aa18ae06c64ff88beba4bfe780f7de03d0dc77f1 UNKNOWN
   * bf49c2297508e5dde1a9e0d8d4dfbafefc5f46f2 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4623) 
   * fb16ba0168cae5742f27c87acae3c477dd42150b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4552",
       "triggerID" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4556",
       "triggerID" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4560",
       "triggerID" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4571",
       "triggerID" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bf49c2297508e5dde1a9e0d8d4dfbafefc5f46f2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4623",
       "triggerID" : "bf49c2297508e5dde1a9e0d8d4dfbafefc5f46f2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fb16ba0168cae5742f27c87acae3c477dd42150b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4717",
       "triggerID" : "fb16ba0168cae5742f27c87acae3c477dd42150b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a1ea81a9dd67cab4c8b483dfe3eed51d7ab4a9e2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a1ea81a9dd67cab4c8b483dfe3eed51d7ab4a9e2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aa18ae06c64ff88beba4bfe780f7de03d0dc77f1 UNKNOWN
   * fb16ba0168cae5742f27c87acae3c477dd42150b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4717) 
   * a1ea81a9dd67cab4c8b483dfe3eed51d7ab4a9e2 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4552",
       "triggerID" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4556",
       "triggerID" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4560",
       "triggerID" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aa18ae06c64ff88beba4bfe780f7de03d0dc77f1 UNKNOWN
   * 577a355b33c5bdd7176f207201dc1b33d98c665a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4560) 
   * a64ca67c2e50380c4b6aca1748fb71a3c620377a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c3ac42923fc06b5d0d86c01c814918916ab86790 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524) 
   * aa18ae06c64ff88beba4bfe780f7de03d0dc77f1 UNKNOWN
   * 937eda6d9a43cfdfb24f97034ee120533c2c904b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
TsReaper commented on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-658638505


   Failed tests are not related to this PR. It succeeds in https://dev.azure.com/tsreaper96/Flink/_build/results?buildId=38&view=results


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4552",
       "triggerID" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4556",
       "triggerID" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4560",
       "triggerID" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aa18ae06c64ff88beba4bfe780f7de03d0dc77f1 UNKNOWN
   * 577a355b33c5bdd7176f207201dc1b33d98c665a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4560) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi merged pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #12867:
URL: https://github.com/apache/flink/pull/12867


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 14690b549ea2a758df20a373a9ba272be2f3ecf4 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on a change in pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
TsReaper commented on a change in pull request #12867:
URL: https://github.com/apache/flink/pull/12867#discussion_r455633324



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java
##########
@@ -40,18 +43,37 @@
 	public CollectResultIterator(
 			CompletableFuture<OperatorID> operatorIdFuture,
 			TypeSerializer<T> serializer,
-			String accumulatorName) {
-		this.fetcher = new CollectResultFetcher<>(operatorIdFuture, serializer, accumulatorName);
+			String accumulatorName,
+			CheckpointConfig checkpointConfig) {
+		if (checkpointConfig.getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE) {
+			if (checkpointConfig.getCheckpointInterval() >= CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME) {
+				this.fetcher = new CollectResultFetcher<>(
+					new CheckpointedCollectResultBuffer<>(serializer),
+					operatorIdFuture,
+					accumulatorName);
+			} else {
+				this.fetcher = new CollectResultFetcher<>(
+					new UncheckpointedCollectResultBuffer<>(serializer, false),
+					operatorIdFuture,
+					accumulatorName);
+			}

Review comment:
       If `checkpointConfig.getCheckpointInterval() >= CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME` we are sure that the user explicitly enables a checkpoint. Otherwise we have to sync with the default value of checkpoint interval in `CheckpointCoordinatorConfiguration`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12867:
URL: https://github.com/apache/flink/pull/12867#discussion_r457161336



##########
File path: flink-python/pyflink/table/table_result.py
##########
@@ -134,10 +134,19 @@ def print(self):
         """
         Print the result contents as tableau form to client console.
 
-        For streaming mode, this method guarantees end-to-end exactly-once record delivery
-        which requires the checkpointing mechanism to be enabled.
-        By default, checkpointing is disabled. To enable checkpointing, set checkpointing properties
-        (see ExecutionCheckpointingOptions) through `TableConfig#getConfiguration()`.
+        This method has slightly different behaviors under different checkpointing settings.
+
+            - For batch jobs or streaming jobs without checkpointing,
+              this method has neither exactly-once nor at-least-once guarantee.

Review comment:
       Reduce indentation




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4552",
       "triggerID" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4556",
       "triggerID" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4560",
       "triggerID" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4571",
       "triggerID" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aa18ae06c64ff88beba4bfe780f7de03d0dc77f1 UNKNOWN
   * a64ca67c2e50380c4b6aca1748fb71a3c620377a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4571) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4552",
       "triggerID" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4556",
       "triggerID" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4560",
       "triggerID" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4571",
       "triggerID" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bf49c2297508e5dde1a9e0d8d4dfbafefc5f46f2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "bf49c2297508e5dde1a9e0d8d4dfbafefc5f46f2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aa18ae06c64ff88beba4bfe780f7de03d0dc77f1 UNKNOWN
   * a64ca67c2e50380c4b6aca1748fb71a3c620377a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4571) 
   * bf49c2297508e5dde1a9e0d8d4dfbafefc5f46f2 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 14690b549ea2a758df20a373a9ba272be2f3ecf4 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c3ac42923fc06b5d0d86c01c814918916ab86790 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524) 
   * aa18ae06c64ff88beba4bfe780f7de03d0dc77f1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12867:
URL: https://github.com/apache/flink/pull/12867#discussion_r454930701



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java
##########
@@ -40,18 +43,37 @@
 	public CollectResultIterator(
 			CompletableFuture<OperatorID> operatorIdFuture,
 			TypeSerializer<T> serializer,
-			String accumulatorName) {
-		this.fetcher = new CollectResultFetcher<>(operatorIdFuture, serializer, accumulatorName);
+			String accumulatorName,
+			CheckpointConfig checkpointConfig) {
+		if (checkpointConfig.getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE) {
+			if (checkpointConfig.getCheckpointInterval() >= CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME) {
+				this.fetcher = new CollectResultFetcher<>(
+					new CheckpointedCollectResultBuffer<>(serializer),
+					operatorIdFuture,
+					accumulatorName);
+			} else {
+				this.fetcher = new CollectResultFetcher<>(
+					new UncheckpointedCollectResultBuffer<>(serializer, false),
+					operatorIdFuture,
+					accumulatorName);
+			}

Review comment:
       I think this branch is unnecessary, because it's illegal that checkpoint interval is less than `MINIMAL_CHECKPOINT_TIME`, many places have such validation, e.g. `CheckpointConfig.setCheckpointInterval`

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionRandomITCase.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.collect.utils.CollectSinkFunctionTestWrapper;
+import org.apache.flink.streaming.api.operators.collect.utils.TestJobClient;
+import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.streaming.api.operators.collect.utils.CollectSinkFunctionTestWrapper.ACCUMULATOR_NAME;
+
+/**
+ * Random IT cases for {@link CollectSinkFunction}.
+ * It will perform random insert, random checkpoint and random restart.
+ */
+public class CollectSinkFunctionRandomITCase extends TestLogger {
+
+	private static final int MAX_RESULTS_PER_BATCH = 3;
+	private static final JobID TEST_JOB_ID = new JobID();
+	private static final OperatorID TEST_OPERATOR_ID = new OperatorID();
+
+	private static final TypeSerializer<Integer> serializer = IntSerializer.INSTANCE;
+
+	private CollectSinkFunctionTestWrapper<Integer> functionWrapper;
+	private boolean jobFinished;
+
+	@Test
+	public void testUncheckpointedFunction() throws Exception {
+		// run multiple times for this random test
+		for (int testCount = 30; testCount > 0; testCount--) {
+			functionWrapper = new CollectSinkFunctionTestWrapper<>(serializer, MAX_RESULTS_PER_BATCH * 4);
+			jobFinished = false;
+
+			List<Integer> expected = new ArrayList<>();
+			for (int i = 0; i < 50; i++) {
+				expected.add(i);
+			}
+			Thread feeder = new ThreadWithException(new UncheckpointedDataFeeder(expected));
+
+			List<Integer> actual = runFunctionRandomTest(feeder);
+			assertResultsEqualAfterSort(expected, actual);
+
+			functionWrapper.closeWrapper();
+		}
+	}
+
+	@Test
+	public void testCheckpointedFunction() throws Exception {
+		// run multiple times for this random test
+		for (int testCount = 30; testCount > 0; testCount--) {
+			functionWrapper = new CollectSinkFunctionTestWrapper<>(serializer, MAX_RESULTS_PER_BATCH * 4);
+			jobFinished = false;
+
+			List<Integer> expected = new ArrayList<>();
+			for (int i = 0; i < 50; i++) {
+				expected.add(i);
+			}
+			Thread feeder = new ThreadWithException(new CheckpointedDataFeeder(expected));
+
+			List<Integer> actual = runFunctionRandomTest(feeder);
+			assertResultsEqualAfterSort(expected, actual);
+
+			functionWrapper.closeWrapper();
+		}
+	}
+
+	private List<Integer> runFunctionRandomTest(Thread feeder) throws Exception {
+		CollectClient collectClient = new CollectClient();
+		Thread client = new ThreadWithException(collectClient);
+
+		Thread.UncaughtExceptionHandler exceptionHandler = (t, e) -> {
+			feeder.interrupt();
+			client.interrupt();
+			e.printStackTrace();
+		};
+		feeder.setUncaughtExceptionHandler(exceptionHandler);
+		client.setUncaughtExceptionHandler(exceptionHandler);
+
+		feeder.start();
+		client.start();
+		feeder.join();
+		client.join();
+
+		return collectClient.results;
+	}
+
+	private void assertResultsEqualAfterSort(List<Integer> expected, List<Integer> actual) {
+		Collections.sort(expected);
+		Collections.sort(actual);
+		Assert.assertThat(actual, CoreMatchers.is(expected));
+	}
+
+	/**
+	 * A {@link RunnableWithException} feeding data to the function. It will fail when half of the data is fed.
+	 */
+	private class UncheckpointedDataFeeder implements RunnableWithException {
+
+		private LinkedList<Integer> data;
+		private final List<Integer> checkpointedData;

Review comment:
       `checkpointedData` for  `UncheckpointedDataFeeder` ?

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/CollectSinkFunctionTestWrapper.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect.utils;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest;
+import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A wrapper class for creating, checkpointing and closing
+ * {@link org.apache.flink.streaming.api.operators.collect.CollectSinkFunction} for tests.
+ */
+public class CollectSinkFunctionTestWrapper<IN> {
+
+	public static final String ACCUMULATOR_NAME = "tableCollectAccumulator";
+
+	private static final int SOCKET_TIMEOUT_MILLIS = 1000;
+	private static final int FUTURE_TIMEOUT_MILLIS = 10000;
+	private static final int MAX_RETIRES = 100;
+
+	private final TypeSerializer<IN> serializer;
+	private final int maxBytesPerBatch;
+
+	private final IOManager ioManager;
+	private final StreamingRuntimeContext runtimeContext;
+	private final MockOperatorEventGateway gateway;
+	private final CollectSinkOperatorCoordinator coordinator;
+	private final MockFunctionInitializationContext functionInitializationContext;
+
+	private CollectSinkFunction<IN> function;
+
+	public CollectSinkFunctionTestWrapper(TypeSerializer<IN> serializer, int maxBytesPerBatch) throws Exception {
+		this.serializer = serializer;
+		this.maxBytesPerBatch = maxBytesPerBatch;
+
+		this.ioManager = new IOManagerAsync();
+		MockEnvironment environment = new MockEnvironmentBuilder()
+			.setTaskName("mockTask")
+			.setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+			.setIOManager(ioManager)
+			.build();
+		this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
+		this.gateway = new MockOperatorEventGateway();
+
+		this.coordinator = new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
+		this.coordinator.start();
+
+		this.functionInitializationContext = new MockFunctionInitializationContext();
+	}
+
+	public void closeWrapper() throws Exception {
+		coordinator.close();
+		ioManager.close();
+	}
+
+	public CollectSinkOperatorCoordinator getCoordinator() {
+		return coordinator;
+	}
+
+	public void openFunction() throws Exception {
+		function = new CollectSinkFunction<>(serializer, maxBytesPerBatch, ACCUMULATOR_NAME);
+		function.setRuntimeContext(runtimeContext);
+		function.setOperatorEventGateway(gateway);
+		function.open(new Configuration());
+		coordinator.handleEventFromOperator(0, gateway.getNextEvent());
+	}
+
+	public void openFunctionWithState() throws Exception {
+		functionInitializationContext.getOperatorStateStore().revertToLastSuccessCheckpoint();
+		function = new CollectSinkFunction<>(serializer, maxBytesPerBatch, ACCUMULATOR_NAME);
+		function.setRuntimeContext(runtimeContext);
+		function.setOperatorEventGateway(gateway);
+		function.initializeState(functionInitializationContext);
+		function.open(new Configuration());
+		coordinator.handleEventFromOperator(0, gateway.getNextEvent());
+	}
+
+	public void invoke(IN record) throws Exception {
+		function.invoke(record, null);
+	}
+
+	public void checkpointFunction(long checkpointId) throws Exception {
+		function.snapshotState(new MockFunctionSnapshotContext(checkpointId));
+		functionInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
+	}
+
+	public void checkpointComplete(long checkpointId) {
+		function.notifyCheckpointComplete(checkpointId);
+		functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
+	}
+
+	public void closeFunctionNormally() throws Exception {
+		// this is a normal shutdown
+		function.accumulateFinalResults();
+		function.close();
+	}
+
+	public void closeFuntionAbnormally() throws Exception {

Review comment:
       typo: `Funtion`

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/UncheckpointedCollectResultBuffer.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.io.IOException;
+
+/**
+ * A buffer which encapsulates the logic of dealing with the response from the {@link CollectSinkFunction}.
+ * It ignores the checkpoint related fields in the response.
+ * See Java doc of {@link CollectSinkFunction} for explanation of this communication protocol.
+ */
+public class UncheckpointedCollectResultBuffer<T> extends AbstractCollectResultBuffer<T> {
+
+	private final boolean failureTolerance;
+
+	public UncheckpointedCollectResultBuffer(TypeSerializer<T> serializer, boolean failureTolerance) {
+		super(serializer);
+		this.failureTolerance = failureTolerance;
+	}
+
+	@Override
+	public void dealWithResponse(CollectCoordinationResponse response, long responseOffset) throws IOException {
+		String responseVersion = response.getVersion();
+
+		if (!version.equals(responseVersion)) {
+			if (!INIT_VERSION.equals(version) && !failureTolerance) {
+				// sink restarted but we do not tolerate failure
+				throw new RuntimeException("Job restarted");
+			}
+
+			reset();
+			version = responseVersion;
+		}
+
+		addResults(response, responseOffset);
+		// the results are instantly visible by users
+		userVisibleTail = offset;

Review comment:
       change `addResults ` as a utility method ? so that we can handle the variables (e.g. `offset`) in one method, and make this method more readable.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/CollectSinkFunctionTestWrapper.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect.utils;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest;
+import org.apache.flink.streaming.api.operators.collect.CollectCoordinationResponse;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkFunction;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A wrapper class for creating, checkpointing and closing
+ * {@link org.apache.flink.streaming.api.operators.collect.CollectSinkFunction} for tests.
+ */
+public class CollectSinkFunctionTestWrapper<IN> {
+
+	public static final String ACCUMULATOR_NAME = "tableCollectAccumulator";
+
+	private static final int SOCKET_TIMEOUT_MILLIS = 1000;
+	private static final int FUTURE_TIMEOUT_MILLIS = 10000;
+	private static final int MAX_RETIRES = 100;
+
+	private final TypeSerializer<IN> serializer;
+	private final int maxBytesPerBatch;
+
+	private final IOManager ioManager;
+	private final StreamingRuntimeContext runtimeContext;
+	private final MockOperatorEventGateway gateway;
+	private final CollectSinkOperatorCoordinator coordinator;
+	private final MockFunctionInitializationContext functionInitializationContext;
+
+	private CollectSinkFunction<IN> function;
+
+	public CollectSinkFunctionTestWrapper(TypeSerializer<IN> serializer, int maxBytesPerBatch) throws Exception {
+		this.serializer = serializer;
+		this.maxBytesPerBatch = maxBytesPerBatch;
+
+		this.ioManager = new IOManagerAsync();
+		MockEnvironment environment = new MockEnvironmentBuilder()
+			.setTaskName("mockTask")
+			.setManagedMemorySize(4 * MemoryManager.DEFAULT_PAGE_SIZE)
+			.setIOManager(ioManager)
+			.build();
+		this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
+		this.gateway = new MockOperatorEventGateway();
+
+		this.coordinator = new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
+		this.coordinator.start();
+
+		this.functionInitializationContext = new MockFunctionInitializationContext();
+	}
+
+	public void closeWrapper() throws Exception {
+		coordinator.close();
+		ioManager.close();
+	}
+
+	public CollectSinkOperatorCoordinator getCoordinator() {
+		return coordinator;
+	}
+
+	public void openFunction() throws Exception {
+		function = new CollectSinkFunction<>(serializer, maxBytesPerBatch, ACCUMULATOR_NAME);
+		function.setRuntimeContext(runtimeContext);
+		function.setOperatorEventGateway(gateway);
+		function.open(new Configuration());
+		coordinator.handleEventFromOperator(0, gateway.getNextEvent());
+	}
+
+	public void openFunctionWithState() throws Exception {
+		functionInitializationContext.getOperatorStateStore().revertToLastSuccessCheckpoint();
+		function = new CollectSinkFunction<>(serializer, maxBytesPerBatch, ACCUMULATOR_NAME);
+		function.setRuntimeContext(runtimeContext);
+		function.setOperatorEventGateway(gateway);
+		function.initializeState(functionInitializationContext);
+		function.open(new Configuration());
+		coordinator.handleEventFromOperator(0, gateway.getNextEvent());
+	}
+
+	public void invoke(IN record) throws Exception {
+		function.invoke(record, null);
+	}
+
+	public void checkpointFunction(long checkpointId) throws Exception {
+		function.snapshotState(new MockFunctionSnapshotContext(checkpointId));
+		functionInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
+	}
+
+	public void checkpointComplete(long checkpointId) {
+		function.notifyCheckpointComplete(checkpointId);
+		functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
+	}
+
+	public void closeFunctionNormally() throws Exception {
+		// this is a normal shutdown
+		function.accumulateFinalResults();
+		function.close();
+	}
+
+	public void closeFuntionAbnormally() throws Exception {
+		// this is an exceptional shutdown
+		function.close();
+		coordinator.subtaskFailed(0, null);
+	}
+
+	public CollectCoordinationResponse sendRequestAndGetResponse(String version, long offset) throws Exception {
+		CollectCoordinationResponse response;
+		for (int i = 0; i < MAX_RETIRES; i++) {
+			response = sendRequest(version, offset);
+			if (response.getLastCheckpointedOffset() >= 0) {
+				return response;
+			}
+		}
+		throw new RuntimeException("Too many retries in sendRequestAndGetValidResponse");
+	}
+
+	private CollectCoordinationResponse sendRequest(String version, long offset) throws Exception {
+		CollectCoordinationRequest request = new CollectCoordinationRequest(version, offset);
+		// we add a timeout to not block the tests if it fails
+		return ((CollectCoordinationResponse) coordinator
+			.handleCoordinationRequest(request).get(FUTURE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
+	}
+
+	public Tuple2<Long, CollectCoordinationResponse> getAccumualtorResults() throws Exception {

Review comment:
       typo: `Accumualtor`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 14690b549ea2a758df20a373a9ba272be2f3ecf4 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396) 
   * c3ac42923fc06b5d0d86c01c814918916ab86790 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4552",
       "triggerID" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4556",
       "triggerID" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aa18ae06c64ff88beba4bfe780f7de03d0dc77f1 UNKNOWN
   * 937eda6d9a43cfdfb24f97034ee120533c2c904b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4552) 
   * c39f4a57e2cf4383ce76795f118096a1fffcbd6d Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4556) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4552",
       "triggerID" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4556",
       "triggerID" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4560",
       "triggerID" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4571",
       "triggerID" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bf49c2297508e5dde1a9e0d8d4dfbafefc5f46f2",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4623",
       "triggerID" : "bf49c2297508e5dde1a9e0d8d4dfbafefc5f46f2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fb16ba0168cae5742f27c87acae3c477dd42150b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4717",
       "triggerID" : "fb16ba0168cae5742f27c87acae3c477dd42150b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aa18ae06c64ff88beba4bfe780f7de03d0dc77f1 UNKNOWN
   * bf49c2297508e5dde1a9e0d8d4dfbafefc5f46f2 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4623) 
   * fb16ba0168cae5742f27c87acae3c477dd42150b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4717) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4552",
       "triggerID" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4556",
       "triggerID" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4560",
       "triggerID" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4571",
       "triggerID" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bf49c2297508e5dde1a9e0d8d4dfbafefc5f46f2",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4623",
       "triggerID" : "bf49c2297508e5dde1a9e0d8d4dfbafefc5f46f2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aa18ae06c64ff88beba4bfe780f7de03d0dc77f1 UNKNOWN
   * bf49c2297508e5dde1a9e0d8d4dfbafefc5f46f2 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4623) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4552",
       "triggerID" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aa18ae06c64ff88beba4bfe780f7de03d0dc77f1 UNKNOWN
   * 937eda6d9a43cfdfb24f97034ee120533c2c904b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4552) 
   * c39f4a57e2cf4383ce76795f118096a1fffcbd6d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4552",
       "triggerID" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4556",
       "triggerID" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4560",
       "triggerID" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aa18ae06c64ff88beba4bfe780f7de03d0dc77f1 UNKNOWN
   * c39f4a57e2cf4383ce76795f118096a1fffcbd6d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4556) 
   * 577a355b33c5bdd7176f207201dc1b33d98c665a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4560) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on a change in pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
TsReaper commented on a change in pull request #12867:
URL: https://github.com/apache/flink/pull/12867#discussion_r455641265



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java
##########
@@ -40,18 +43,37 @@
 	public CollectResultIterator(
 			CompletableFuture<OperatorID> operatorIdFuture,
 			TypeSerializer<T> serializer,
-			String accumulatorName) {
-		this.fetcher = new CollectResultFetcher<>(operatorIdFuture, serializer, accumulatorName);
+			String accumulatorName,
+			CheckpointConfig checkpointConfig) {
+		if (checkpointConfig.getCheckpointingMode() == CheckpointingMode.EXACTLY_ONCE) {
+			if (checkpointConfig.getCheckpointInterval() >= CheckpointCoordinatorConfiguration.MINIMAL_CHECKPOINT_TIME) {
+				this.fetcher = new CollectResultFetcher<>(
+					new CheckpointedCollectResultBuffer<>(serializer),
+					operatorIdFuture,
+					accumulatorName);
+			} else {
+				this.fetcher = new CollectResultFetcher<>(
+					new UncheckpointedCollectResultBuffer<>(serializer, false),
+					operatorIdFuture,
+					accumulatorName);
+			}

Review comment:
       OK... It seems that `checkpointConfig.isCheckpointingEnabled()` is a better solution.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656577817


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 14690b549ea2a758df20a373a9ba272be2f3ecf4 (Fri Jul 10 09:20:37 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-18558).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] TsReaper commented on a change in pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
TsReaper commented on a change in pull request #12867:
URL: https://github.com/apache/flink/pull/12867#discussion_r455633715



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionRandomITCase.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.collect;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.collect.utils.CollectSinkFunctionTestWrapper;
+import org.apache.flink.streaming.api.operators.collect.utils.TestJobClient;
+import org.apache.flink.util.OptionalFailure;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.streaming.api.operators.collect.utils.CollectSinkFunctionTestWrapper.ACCUMULATOR_NAME;
+
+/**
+ * Random IT cases for {@link CollectSinkFunction}.
+ * It will perform random insert, random checkpoint and random restart.
+ */
+public class CollectSinkFunctionRandomITCase extends TestLogger {
+
+	private static final int MAX_RESULTS_PER_BATCH = 3;
+	private static final JobID TEST_JOB_ID = new JobID();
+	private static final OperatorID TEST_OPERATOR_ID = new OperatorID();
+
+	private static final TypeSerializer<Integer> serializer = IntSerializer.INSTANCE;
+
+	private CollectSinkFunctionTestWrapper<Integer> functionWrapper;
+	private boolean jobFinished;
+
+	@Test
+	public void testUncheckpointedFunction() throws Exception {
+		// run multiple times for this random test
+		for (int testCount = 30; testCount > 0; testCount--) {
+			functionWrapper = new CollectSinkFunctionTestWrapper<>(serializer, MAX_RESULTS_PER_BATCH * 4);
+			jobFinished = false;
+
+			List<Integer> expected = new ArrayList<>();
+			for (int i = 0; i < 50; i++) {
+				expected.add(i);
+			}
+			Thread feeder = new ThreadWithException(new UncheckpointedDataFeeder(expected));
+
+			List<Integer> actual = runFunctionRandomTest(feeder);
+			assertResultsEqualAfterSort(expected, actual);
+
+			functionWrapper.closeWrapper();
+		}
+	}
+
+	@Test
+	public void testCheckpointedFunction() throws Exception {
+		// run multiple times for this random test
+		for (int testCount = 30; testCount > 0; testCount--) {
+			functionWrapper = new CollectSinkFunctionTestWrapper<>(serializer, MAX_RESULTS_PER_BATCH * 4);
+			jobFinished = false;
+
+			List<Integer> expected = new ArrayList<>();
+			for (int i = 0; i < 50; i++) {
+				expected.add(i);
+			}
+			Thread feeder = new ThreadWithException(new CheckpointedDataFeeder(expected));
+
+			List<Integer> actual = runFunctionRandomTest(feeder);
+			assertResultsEqualAfterSort(expected, actual);
+
+			functionWrapper.closeWrapper();
+		}
+	}
+
+	private List<Integer> runFunctionRandomTest(Thread feeder) throws Exception {
+		CollectClient collectClient = new CollectClient();
+		Thread client = new ThreadWithException(collectClient);
+
+		Thread.UncaughtExceptionHandler exceptionHandler = (t, e) -> {
+			feeder.interrupt();
+			client.interrupt();
+			e.printStackTrace();
+		};
+		feeder.setUncaughtExceptionHandler(exceptionHandler);
+		client.setUncaughtExceptionHandler(exceptionHandler);
+
+		feeder.start();
+		client.start();
+		feeder.join();
+		client.join();
+
+		return collectClient.results;
+	}
+
+	private void assertResultsEqualAfterSort(List<Integer> expected, List<Integer> actual) {
+		Collections.sort(expected);
+		Collections.sort(actual);
+		Assert.assertThat(actual, CoreMatchers.is(expected));
+	}
+
+	/**
+	 * A {@link RunnableWithException} feeding data to the function. It will fail when half of the data is fed.
+	 */
+	private class UncheckpointedDataFeeder implements RunnableWithException {
+
+		private LinkedList<Integer> data;
+		private final List<Integer> checkpointedData;

Review comment:
       `originalData` might be the proper name.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c3ac42923fc06b5d0d86c01c814918916ab86790 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4552",
       "triggerID" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4556",
       "triggerID" : "c39f4a57e2cf4383ce76795f118096a1fffcbd6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4560",
       "triggerID" : "577a355b33c5bdd7176f207201dc1b33d98c665a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4571",
       "triggerID" : "a64ca67c2e50380c4b6aca1748fb71a3c620377a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "bf49c2297508e5dde1a9e0d8d4dfbafefc5f46f2",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4623",
       "triggerID" : "bf49c2297508e5dde1a9e0d8d4dfbafefc5f46f2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fb16ba0168cae5742f27c87acae3c477dd42150b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4717",
       "triggerID" : "fb16ba0168cae5742f27c87acae3c477dd42150b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aa18ae06c64ff88beba4bfe780f7de03d0dc77f1 UNKNOWN
   * fb16ba0168cae5742f27c87acae3c477dd42150b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4717) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12867: [FLINK-18558][streaming] Introduce collect iterator with at least once semantics and exactly once semantics without fault tolerance

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12867:
URL: https://github.com/apache/flink/pull/12867#issuecomment-656590407


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4396",
       "triggerID" : "14690b549ea2a758df20a373a9ba272be2f3ecf4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4524",
       "triggerID" : "c3ac42923fc06b5d0d86c01c814918916ab86790",
       "triggerType" : "PUSH"
     }, {
       "hash" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "aa18ae06c64ff88beba4bfe780f7de03d0dc77f1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4552",
       "triggerID" : "937eda6d9a43cfdfb24f97034ee120533c2c904b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * aa18ae06c64ff88beba4bfe780f7de03d0dc77f1 UNKNOWN
   * 937eda6d9a43cfdfb24f97034ee120533c2c904b Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4552) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org