You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/19 21:49:42 UTC

[GitHub] [beam] ibzib opened a new pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

ibzib opened a new pull request #12637:
URL: https://github.com/apache/beam/pull/12637


   R: @robertwb 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   ![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg)
   ![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] commented on pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on pull request #12637:
URL: https://github.com/apache/beam/pull/12637#issuecomment-695098854


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12637?src=pr&el=h1) Report
   > :exclamation: No coverage uploaded for pull request head (`BEAM-10768@97d548e`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference#section-missing-head-commit).
   > The diff coverage is `n/a`.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r492386778



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -108,16 +106,28 @@ def send(instruction_id, transform_id, data):
         ])
 
     # Multiple interleaved writes to multiple instructions.
-    send('1', transform_1, b'abc')
-    send('2', transform_1, b'def')
+    stream11 = from_channel.output_stream('1', transform_1)
+    stream11.write(b'abc')
+    stream21 = from_channel.output_stream('2', transform_1)
+    stream21.write(b'def')
+    if not time_based_flush:
+      stream11.close()
     self.assertEqual(
         list(
             itertools.islice(to_channel.input_elements('1', [transform_1]), 1)),
         [
             beam_fn_api_pb2.Elements.Data(
                 instruction_id='1', transform_id=transform_1, data=b'abc')
         ])
-    send('2', transform_2, b'ghi')
+    if time_based_flush:

Review comment:
       Write does not provide ordering guarantees in this case.
   
   Elements are stored in a [queue](https://github.com/apache/beam/blob/7b3d4251d244c10545fb37f1d93ebcad84a98681/sdks/python/apache_beam/runners/worker/data_plane.py#L371) before being sent, to enable batching. Elements aren't added to that queue until the [flush callback](https://github.com/apache/beam/blob/7b3d4251d244c10545fb37f1d93ebcad84a98681/sdks/python/apache_beam/runners/worker/data_plane.py#L493) is invoked. Because the flush callback is [invoked periodically](https://github.com/apache/beam/blob/7b3d4251d244c10545fb37f1d93ebcad84a98681/sdks/python/apache_beam/runners/worker/data_plane.py#L182) starting from when a stream is constructed, there is no guarantee that one stream's callback is called before the other.
   

##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -108,16 +106,28 @@ def send(instruction_id, transform_id, data):
         ])
 
     # Multiple interleaved writes to multiple instructions.
-    send('1', transform_1, b'abc')
-    send('2', transform_1, b'def')
+    stream11 = from_channel.output_stream('1', transform_1)
+    stream11.write(b'abc')
+    stream21 = from_channel.output_stream('2', transform_1)
+    stream21.write(b'def')
+    if not time_based_flush:
+      stream11.close()
     self.assertEqual(
         list(
             itertools.islice(to_channel.input_elements('1', [transform_1]), 1)),
         [
             beam_fn_api_pb2.Elements.Data(
                 instruction_id='1', transform_id=transform_1, data=b'abc')
         ])
-    send('2', transform_2, b'ghi')
+    if time_based_flush:

Review comment:
       > Please add details as comment.
   
   Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r487190235



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       Yes, the stream should be created outside of send so that its lifetime can be explicitly managed in the test based upon what the test is trying to do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r492832184



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -108,16 +106,28 @@ def send(instruction_id, transform_id, data):
         ])
 
     # Multiple interleaved writes to multiple instructions.
-    send('1', transform_1, b'abc')
-    send('2', transform_1, b'def')
+    stream11 = from_channel.output_stream('1', transform_1)
+    stream11.write(b'abc')
+    stream21 = from_channel.output_stream('2', transform_1)
+    stream21.write(b'def')
+    if not time_based_flush:
+      stream11.close()
     self.assertEqual(
         list(
             itertools.islice(to_channel.input_elements('1', [transform_1]), 1)),
         [
             beam_fn_api_pb2.Elements.Data(
                 instruction_id='1', transform_id=transform_1, data=b'abc')
         ])
-    send('2', transform_2, b'ghi')
+    if time_based_flush:

Review comment:
       I would have expected that `from_channel.output_stream` would share the same queue of elements across distinct transforms when dealing with the same instruction id instead of multiple different queues with separate flushing characteristics.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r487165664



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       So is the correct fix to create `stream` outside of `send` and re-use it? Or do we need a stream per assertion?

##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       So is the correct fix to create `stream` outside of `send` and re-use it? Or do we need a stream per assertion?

##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       So is the correct fix to create `stream` outside of `send` and re-use it? Or do we need a stream per assertion?

##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       So is the correct fix to create `stream` outside of `send` and re-use it? Or do we need a stream per assertion?

##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       So is the correct fix to create `stream` outside of `send` and re-use it? Or do we need a stream per assertion?

##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       So is the correct fix to create `stream` outside of `send` and re-use it? Or do we need a stream per assertion?

##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       So is the correct fix to create `stream` outside of `send` and re-use it? Or do we need a stream per assertion?

##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       So is the correct fix to create `stream` outside of `send` and re-use it? Or do we need a stream per assertion?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r487165664



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       So is the correct fix to create `stream` outside of `send` and re-use it? Or do we need a stream per assertion?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r474178233



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       I don't think this change updates the test to test the correct expectations since we should be testing that multiple messages are received in order over the channel.
   
   I believe `send` doesn't have the correct semantics since we want to have a persistent stream for the life of this test and to control when it gets closed after sending for a specific `instruction,transform` pair is done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r492828079



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -108,16 +106,28 @@ def send(instruction_id, transform_id, data):
         ])
 
     # Multiple interleaved writes to multiple instructions.
-    send('1', transform_1, b'abc')
-    send('2', transform_1, b'def')
+    stream11 = from_channel.output_stream('1', transform_1)
+    stream11.write(b'abc')
+    stream21 = from_channel.output_stream('2', transform_1)
+    stream21.write(b'def')
+    if not time_based_flush:
+      stream11.close()
     self.assertEqual(
         list(
             itertools.islice(to_channel.input_elements('1', [transform_1]), 1)),
         [
             beam_fn_api_pb2.Elements.Data(
                 instruction_id='1', transform_id=transform_1, data=b'abc')
         ])
-    send('2', transform_2, b'ghi')
+    if time_based_flush:

Review comment:
       Please add details as comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r492359619



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -108,16 +106,28 @@ def send(instruction_id, transform_id, data):
         ])
 
     # Multiple interleaved writes to multiple instructions.
-    send('1', transform_1, b'abc')
-    send('2', transform_1, b'def')
+    stream11 = from_channel.output_stream('1', transform_1)
+    stream11.write(b'abc')
+    stream21 = from_channel.output_stream('2', transform_1)
+    stream21.write(b'def')
+    if not time_based_flush:
+      stream11.close()
     self.assertEqual(
         list(
             itertools.islice(to_channel.input_elements('1', [transform_1]), 1)),
         [
             beam_fn_api_pb2.Elements.Data(
                 instruction_id='1', transform_id=transform_1, data=b'abc')
         ])
-    send('2', transform_2, b'ghi')
+    if time_based_flush:

Review comment:
       Why do we need to wait for the flush, shouldn't the earlier `stream21.write(b'def')` provide the correct ordering?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r487165664



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       So is the correct fix to create `stream` outside of `send` and re-use it? Or do we need a stream per assertion?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on pull request #12637:
URL: https://github.com/apache/beam/pull/12637#issuecomment-692843176


   seeing this flake in precommits, thanks for fixing it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r493002014



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -108,16 +106,28 @@ def send(instruction_id, transform_id, data):
         ])
 
     # Multiple interleaved writes to multiple instructions.
-    send('1', transform_1, b'abc')
-    send('2', transform_1, b'def')
+    stream11 = from_channel.output_stream('1', transform_1)
+    stream11.write(b'abc')
+    stream21 = from_channel.output_stream('2', transform_1)
+    stream21.write(b'def')
+    if not time_based_flush:
+      stream11.close()
     self.assertEqual(
         list(
             itertools.islice(to_channel.input_elements('1', [transform_1]), 1)),
         [
             beam_fn_api_pb2.Elements.Data(
                 instruction_id='1', transform_id=transform_1, data=b'abc')
         ])
-    send('2', transform_2, b'ghi')
+    if time_based_flush:

Review comment:
       > Please add details as comment.
   
   Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r487190235



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       Yes, the stream should be created outside of send so that its lifetime can be explicitly managed in the test based upon what the test is trying to do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r474179131



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       Also, we may want to have a sleep in the time based test to force a flush sometimes between messages (which likely means we should test with more messages).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib merged pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
ibzib merged pull request #12637:
URL: https://github.com/apache/beam/pull/12637


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r492359619



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -108,16 +106,28 @@ def send(instruction_id, transform_id, data):
         ])
 
     # Multiple interleaved writes to multiple instructions.
-    send('1', transform_1, b'abc')
-    send('2', transform_1, b'def')
+    stream11 = from_channel.output_stream('1', transform_1)
+    stream11.write(b'abc')
+    stream21 = from_channel.output_stream('2', transform_1)
+    stream21.write(b'def')
+    if not time_based_flush:
+      stream11.close()
     self.assertEqual(
         list(
             itertools.islice(to_channel.input_elements('1', [transform_1]), 1)),
         [
             beam_fn_api_pb2.Elements.Data(
                 instruction_id='1', transform_id=transform_1, data=b'abc')
         ])
-    send('2', transform_2, b'ghi')
+    if time_based_flush:

Review comment:
       Why do we need to wait for the flush, shouldn't the earlier `stream21.write(b'def')` provide the correct ordering?

##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -108,16 +106,28 @@ def send(instruction_id, transform_id, data):
         ])
 
     # Multiple interleaved writes to multiple instructions.
-    send('1', transform_1, b'abc')
-    send('2', transform_1, b'def')
+    stream11 = from_channel.output_stream('1', transform_1)
+    stream11.write(b'abc')
+    stream21 = from_channel.output_stream('2', transform_1)
+    stream21.write(b'def')
+    if not time_based_flush:
+      stream11.close()
     self.assertEqual(
         list(
             itertools.islice(to_channel.input_elements('1', [transform_1]), 1)),
         [
             beam_fn_api_pb2.Elements.Data(
                 instruction_id='1', transform_id=transform_1, data=b'abc')
         ])
-    send('2', transform_2, b'ghi')
+    if time_based_flush:

Review comment:
       Please add details as comment.

##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -108,16 +106,28 @@ def send(instruction_id, transform_id, data):
         ])
 
     # Multiple interleaved writes to multiple instructions.
-    send('1', transform_1, b'abc')
-    send('2', transform_1, b'def')
+    stream11 = from_channel.output_stream('1', transform_1)
+    stream11.write(b'abc')
+    stream21 = from_channel.output_stream('2', transform_1)
+    stream21.write(b'def')
+    if not time_based_flush:
+      stream11.close()
     self.assertEqual(
         list(
             itertools.islice(to_channel.input_elements('1', [transform_1]), 1)),
         [
             beam_fn_api_pb2.Elements.Data(
                 instruction_id='1', transform_id=transform_1, data=b'abc')
         ])
-    send('2', transform_2, b'ghi')
+    if time_based_flush:

Review comment:
       I would have expected that `from_channel.output_stream` would share the same queue of elements when dealing with the same instruction id instead of two different queues with separate flushing characteristics.

##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -108,16 +106,28 @@ def send(instruction_id, transform_id, data):
         ])
 
     # Multiple interleaved writes to multiple instructions.
-    send('1', transform_1, b'abc')
-    send('2', transform_1, b'def')
+    stream11 = from_channel.output_stream('1', transform_1)
+    stream11.write(b'abc')
+    stream21 = from_channel.output_stream('2', transform_1)
+    stream21.write(b'def')
+    if not time_based_flush:
+      stream11.close()
     self.assertEqual(
         list(
             itertools.islice(to_channel.input_elements('1', [transform_1]), 1)),
         [
             beam_fn_api_pb2.Elements.Data(
                 instruction_id='1', transform_id=transform_1, data=b'abc')
         ])
-    send('2', transform_2, b'ghi')
+    if time_based_flush:

Review comment:
       I would have expected that `from_channel.output_stream` would share the same queue of elements across distinct transforms when dealing with the same instruction id instead of multiple different queues with separate flushing characteristics.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r492832184



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -108,16 +106,28 @@ def send(instruction_id, transform_id, data):
         ])
 
     # Multiple interleaved writes to multiple instructions.
-    send('1', transform_1, b'abc')
-    send('2', transform_1, b'def')
+    stream11 = from_channel.output_stream('1', transform_1)
+    stream11.write(b'abc')
+    stream21 = from_channel.output_stream('2', transform_1)
+    stream21.write(b'def')
+    if not time_based_flush:
+      stream11.close()
     self.assertEqual(
         list(
             itertools.islice(to_channel.input_elements('1', [transform_1]), 1)),
         [
             beam_fn_api_pb2.Elements.Data(
                 instruction_id='1', transform_id=transform_1, data=b'abc')
         ])
-    send('2', transform_2, b'ghi')
+    if time_based_flush:

Review comment:
       I would have expected that `from_channel.output_stream` would share the same queue of elements when dealing with the same instruction id instead of two different queues with separate flushing characteristics.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12637:
URL: https://github.com/apache/beam/pull/12637#issuecomment-695098854


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12637?src=pr&el=h1) Report
   > Merging [#12637](https://codecov.io/gh/apache/beam/pull/12637?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/f4c2734261396858e388ebef2eef50e7d48231a8?el=desc) will **increase** coverage by `0.02%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12637/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12637?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12637      +/-   ##
   ==========================================
   + Coverage   82.31%   82.34%   +0.02%     
   ==========================================
     Files         451      454       +3     
     Lines       53874    54119     +245     
   ==========================================
   + Hits        44349    44566     +217     
   - Misses       9525     9553      +28     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12637?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...am/runners/interactive/options/capture\_limiters.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9vcHRpb25zL2NhcHR1cmVfbGltaXRlcnMucHk=) | `90.47% <0.00%> (-3.08%)` | :arrow_down: |
   | [.../python/apache\_beam/transforms/periodicsequence.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9wZXJpb2RpY3NlcXVlbmNlLnB5) | `96.49% <0.00%> (-1.76%)` | :arrow_down: |
   | [...dks/python/apache\_beam/runners/pipeline\_context.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9waXBlbGluZV9jb250ZXh0LnB5) | `92.80% <0.00%> (-1.01%)` | :arrow_down: |
   | [...s/python/apache\_beam/io/gcp/bigquery\_file\_loads.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X2ZpbGVfbG9hZHMucHk=) | `89.97% <0.00%> (-0.63%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `88.80% <0.00%> (-0.54%)` | :arrow_down: |
   | [sdks/python/apache\_beam/runners/common.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9jb21tb24ucHk=) | `88.75% <0.00%> (-0.45%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/fn\_runner.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2ZuX3J1bm5lci5weQ==) | `89.53% <0.00%> (-0.21%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/combiners.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb21iaW5lcnMucHk=) | `91.97% <0.00%> (-0.19%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `94.45% <0.00%> (-0.14%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `89.83% <0.00%> (-0.07%)` | :arrow_down: |
   | ... and [14 more](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12637?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12637?src=pr&el=footer). Last update [f4c2734...a4ea68c](https://codecov.io/gh/apache/beam/pull/12637?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r487190235



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       Yes, the stream should be created outside of send so that its lifetime can be explicitly managed in the test based upon what the test is trying to do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r492386778



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -108,16 +106,28 @@ def send(instruction_id, transform_id, data):
         ])
 
     # Multiple interleaved writes to multiple instructions.
-    send('1', transform_1, b'abc')
-    send('2', transform_1, b'def')
+    stream11 = from_channel.output_stream('1', transform_1)
+    stream11.write(b'abc')
+    stream21 = from_channel.output_stream('2', transform_1)
+    stream21.write(b'def')
+    if not time_based_flush:
+      stream11.close()
     self.assertEqual(
         list(
             itertools.islice(to_channel.input_elements('1', [transform_1]), 1)),
         [
             beam_fn_api_pb2.Elements.Data(
                 instruction_id='1', transform_id=transform_1, data=b'abc')
         ])
-    send('2', transform_2, b'ghi')
+    if time_based_flush:

Review comment:
       Write does not provide ordering guarantees in this case.
   
   Elements are stored in a [queue](https://github.com/apache/beam/blob/7b3d4251d244c10545fb37f1d93ebcad84a98681/sdks/python/apache_beam/runners/worker/data_plane.py#L371) before being sent, to enable batching. Elements aren't added to that queue until the [flush callback](https://github.com/apache/beam/blob/7b3d4251d244c10545fb37f1d93ebcad84a98681/sdks/python/apache_beam/runners/worker/data_plane.py#L493) is invoked. Because the flush callback is [invoked periodically](https://github.com/apache/beam/blob/7b3d4251d244c10545fb37f1d93ebcad84a98681/sdks/python/apache_beam/runners/worker/data_plane.py#L182) starting from when a stream is constructed, there is no guarantee that one stream's callback is called before the other.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r487190235



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       Yes, the stream should be created outside of send so that its lifetime can be explicitly managed in the test based upon what the test is trying to do.

##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       Yes, the stream should be created outside of send so that its lifetime can be explicitly managed in the test based upon what the test is trying to do.

##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       Yes, the stream should be created outside of send so that its lifetime can be explicitly managed in the test based upon what the test is trying to do.

##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       Yes, the stream should be created outside of send so that its lifetime can be explicitly managed in the test based upon what the test is trying to do.

##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       Yes, the stream should be created outside of send so that its lifetime can be explicitly managed in the test based upon what the test is trying to do.

##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       Yes, the stream should be created outside of send so that its lifetime can be explicitly managed in the test based upon what the test is trying to do.

##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       Yes, the stream should be created outside of send so that its lifetime can be explicitly managed in the test based upon what the test is trying to do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r492359619



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -108,16 +106,28 @@ def send(instruction_id, transform_id, data):
         ])
 
     # Multiple interleaved writes to multiple instructions.
-    send('1', transform_1, b'abc')
-    send('2', transform_1, b'def')
+    stream11 = from_channel.output_stream('1', transform_1)
+    stream11.write(b'abc')
+    stream21 = from_channel.output_stream('2', transform_1)
+    stream21.write(b'def')
+    if not time_based_flush:
+      stream11.close()
     self.assertEqual(
         list(
             itertools.islice(to_channel.input_elements('1', [transform_1]), 1)),
         [
             beam_fn_api_pb2.Elements.Data(
                 instruction_id='1', transform_id=transform_1, data=b'abc')
         ])
-    send('2', transform_2, b'ghi')
+    if time_based_flush:

Review comment:
       Why do we need to wait for the flush, shouldn't the earlier `stream21.write(b'def')` provide the correct ordering?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r491203318



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -99,35 +100,33 @@ def send(instruction_id, transform_id, data):
 
     # Single write.
     send('0', transform_1, b'abc')
-    self.assertEqual(
+    hc.assert_that(

Review comment:
       I implemented some of your suggestions Luke. PTAL




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12637:
URL: https://github.com/apache/beam/pull/12637#issuecomment-695098854


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12637?src=pr&el=h1) Report
   > Merging [#12637](https://codecov.io/gh/apache/beam/pull/12637?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/f4c2734261396858e388ebef2eef50e7d48231a8?el=desc) will **increase** coverage by `0.02%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12637/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12637?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12637      +/-   ##
   ==========================================
   + Coverage   82.31%   82.34%   +0.02%     
   ==========================================
     Files         451      454       +3     
     Lines       53874    54119     +245     
   ==========================================
   + Hits        44349    44566     +217     
   - Misses       9525     9553      +28     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12637?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...am/runners/interactive/options/capture\_limiters.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9vcHRpb25zL2NhcHR1cmVfbGltaXRlcnMucHk=) | `90.47% <0.00%> (-3.08%)` | :arrow_down: |
   | [.../python/apache\_beam/transforms/periodicsequence.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9wZXJpb2RpY3NlcXVlbmNlLnB5) | `96.49% <0.00%> (-1.76%)` | :arrow_down: |
   | [...dks/python/apache\_beam/runners/pipeline\_context.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9waXBlbGluZV9jb250ZXh0LnB5) | `92.80% <0.00%> (-1.01%)` | :arrow_down: |
   | [...s/python/apache\_beam/io/gcp/bigquery\_file\_loads.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X2ZpbGVfbG9hZHMucHk=) | `89.97% <0.00%> (-0.63%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `88.80% <0.00%> (-0.54%)` | :arrow_down: |
   | [sdks/python/apache\_beam/runners/common.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9jb21tb24ucHk=) | `88.75% <0.00%> (-0.45%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/fn\_runner.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2ZuX3J1bm5lci5weQ==) | `89.53% <0.00%> (-0.21%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/combiners.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb21iaW5lcnMucHk=) | `91.97% <0.00%> (-0.19%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `94.45% <0.00%> (-0.14%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `89.83% <0.00%> (-0.07%)` | :arrow_down: |
   | ... and [14 more](https://codecov.io/gh/apache/beam/pull/12637/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12637?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12637?src=pr&el=footer). Last update [f4c2734...a4ea68c](https://codecov.io/gh/apache/beam/pull/12637?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib merged pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
ibzib merged pull request #12637:
URL: https://github.com/apache/beam/pull/12637


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #12637:
URL: https://github.com/apache/beam/pull/12637#discussion_r492386778



##########
File path: sdks/python/apache_beam/runners/worker/data_plane_test.py
##########
@@ -108,16 +106,28 @@ def send(instruction_id, transform_id, data):
         ])
 
     # Multiple interleaved writes to multiple instructions.
-    send('1', transform_1, b'abc')
-    send('2', transform_1, b'def')
+    stream11 = from_channel.output_stream('1', transform_1)
+    stream11.write(b'abc')
+    stream21 = from_channel.output_stream('2', transform_1)
+    stream21.write(b'def')
+    if not time_based_flush:
+      stream11.close()
     self.assertEqual(
         list(
             itertools.islice(to_channel.input_elements('1', [transform_1]), 1)),
         [
             beam_fn_api_pb2.Elements.Data(
                 instruction_id='1', transform_id=transform_1, data=b'abc')
         ])
-    send('2', transform_2, b'ghi')
+    if time_based_flush:

Review comment:
       Write does not provide ordering guarantees in this case.
   
   Elements are stored in a [queue](https://github.com/apache/beam/blob/7b3d4251d244c10545fb37f1d93ebcad84a98681/sdks/python/apache_beam/runners/worker/data_plane.py#L371) before being sent, to enable batching. Elements aren't added to that queue until the [flush callback](https://github.com/apache/beam/blob/7b3d4251d244c10545fb37f1d93ebcad84a98681/sdks/python/apache_beam/runners/worker/data_plane.py#L493) is invoked. Because the flush callback is [invoked periodically](https://github.com/apache/beam/blob/7b3d4251d244c10545fb37f1d93ebcad84a98681/sdks/python/apache_beam/runners/worker/data_plane.py#L182) starting from when a stream is constructed, there is no guarantee that one stream's callback is called before the other.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12637:
URL: https://github.com/apache/beam/pull/12637#issuecomment-695098854






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12637:
URL: https://github.com/apache/beam/pull/12637#issuecomment-695098854


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12637?src=pr&el=h1) Report
   > :exclamation: No coverage uploaded for pull request head (`BEAM-10768@a4ea68c`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference#section-missing-head-commit).
   > The diff coverage is `n/a`.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12637: [BEAM-10768] Don't assert the order in which elements are received.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12637:
URL: https://github.com/apache/beam/pull/12637#issuecomment-677821617


   R: @lukecwik


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org