You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/19 22:24:14 UTC

[GitHub] [beam] lukecwik opened a new pull request #12639: [BEAM-9979] Fix read index being reported to be reset after any metrics request could happen for the prior bundle and before any metrics request for the next bundle.

lukecwik opened a new pull request #12639:
URL: https://github.com/apache/beam/pull/12639


   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   ![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg)
   ![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12639: [BEAM-9979] Fix read index being reported to be reset after any metrics request could happen for the prior bundle and before any metrics request for the next bundle.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12639:
URL: https://github.com/apache/beam/pull/12639#discussion_r474135224



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
##########
@@ -352,6 +354,35 @@ public void testRegistration() {
       fail("Expected registrar not found.");
     }
 
+    @Test
+    public void testSplittingBeforeStartBundle() throws Exception {
+      List<WindowedValue<String>> outputValues = new ArrayList<>();
+      BeamFnDataReadRunner<String> readRunner =
+          createReadRunner(outputValues::add, PTRANSFORM_ID, mockBeamFnDataClient);
+      // The split should happen at 5 since the allowedSplitPoints is empty.
+      assertEquals(

Review comment:
       I may misunderstand some content here but I thought we don't perform splitting if the bundle is not started.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik merged pull request #12639: [BEAM-9979] Fix read index being reported to be reset after any metrics request could happen for the prior bundle and before any metrics request for the next bundle.

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #12639:
URL: https://github.com/apache/beam/pull/12639


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #12639: [BEAM-9979] Fix read index being reported to be reset after any metrics request could happen for the prior bundle and before any metrics request for the next bundle.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #12639:
URL: https://github.com/apache/beam/pull/12639#issuecomment-676765571


   R: @boyuanzz 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12639: [BEAM-9979] Fix read index being reported to be reset after any metrics request could happen for the prior bundle and before any metrics request for the next bundle.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12639:
URL: https://github.com/apache/beam/pull/12639#discussion_r474185716



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
##########
@@ -352,6 +354,35 @@ public void testRegistration() {
       fail("Expected registrar not found.");
     }
 
+    @Test
+    public void testSplittingBeforeStartBundle() throws Exception {
+      List<WindowedValue<String>> outputValues = new ArrayList<>();
+      BeamFnDataReadRunner<String> readRunner =
+          createReadRunner(outputValues::add, PTRANSFORM_ID, mockBeamFnDataClient);
+      // The split should happen at 5 since the allowedSplitPoints is empty.
+      assertEquals(

Review comment:
       We always did before as well since `registerInputLocation` doesn't mean we have seen any elements yet just that we are eligible to get them. This expands that time frame before receiving the first element slightly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12639: [BEAM-9979] Fix read index being reported to be reset after any metrics request could happen for the prior bundle and before any metrics request for the next bundle.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12639:
URL: https://github.com/apache/beam/pull/12639#discussion_r474144618



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
##########
@@ -352,6 +354,35 @@ public void testRegistration() {
       fail("Expected registrar not found.");
     }
 
+    @Test
+    public void testSplittingBeforeStartBundle() throws Exception {
+      List<WindowedValue<String>> outputValues = new ArrayList<>();
+      BeamFnDataReadRunner<String> readRunner =
+          createReadRunner(outputValues::add, PTRANSFORM_ID, mockBeamFnDataClient);
+      // The split should happen at 5 since the allowedSplitPoints is empty.
+      assertEquals(

Review comment:
       A bundle can be started before each transforms `start` method has been invoked since we invoke `start` starting from the leaves and going to the parents and then we invoke `finish` in reverse order.
   
   Having this definition of `started` allows us to get msec counters for `start` methods since they can be slow, checkpointing without processing through this read transform...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12639: [BEAM-9979] Fix read index being reported to be reset after any metrics request could happen for the prior bundle and before any metrics request for the next bundle.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12639:
URL: https://github.com/apache/beam/pull/12639#discussion_r473509304



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
##########
@@ -237,8 +232,9 @@ public void trySplit(
     }
 
     synchronized (splittingLock) {
-      // Don't attempt to split if we haven't started.
-      if (!started) {
+      // Don't attempt to split if we are already done since there isn't a meaningful split we can
+      // provide.
+      if (index == stopIndex) {

Review comment:
       should we also check `index != -1`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12639: [BEAM-9979] Fix read index being reported to be reset after any metrics request could happen for the prior bundle and before any metrics request for the next bundle.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12639:
URL: https://github.com/apache/beam/pull/12639#discussion_r474173302



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
##########
@@ -352,6 +354,35 @@ public void testRegistration() {
       fail("Expected registrar not found.");
     }
 
+    @Test
+    public void testSplittingBeforeStartBundle() throws Exception {
+      List<WindowedValue<String>> outputValues = new ArrayList<>();
+      BeamFnDataReadRunner<String> readRunner =
+          createReadRunner(outputValues::add, PTRANSFORM_ID, mockBeamFnDataClient);
+      // The split should happen at 5 since the allowedSplitPoints is empty.
+      assertEquals(

Review comment:
       I see. So that means we are now allowing splitting before processing gets started, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12639: [BEAM-9979] Fix read index being reported to be reset after any metrics request could happen for the prior bundle and before any metrics request for the next bundle.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12639:
URL: https://github.com/apache/beam/pull/12639#discussion_r474104405



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
##########
@@ -237,8 +232,9 @@ public void trySplit(
     }
 
     synchronized (splittingLock) {
-      // Don't attempt to split if we haven't started.
-      if (!started) {
+      // Don't attempt to split if we are already done since there isn't a meaningful split we can
+      // provide.
+      if (index == stopIndex) {

Review comment:
       No, the logic below will allow us to choose a stopIndex even if `registerInputLocation` has not been invoked. See `testSplittingWhenNoElementsProcessed`. I added a variant of that test where we split before `registerInputLocation` happens to get coverage for this case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org