You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "ryucc (via GitHub)" <gi...@apache.org> on 2023/04/14 20:23:51 UTC

[GitHub] [beam] ryucc opened a new pull request, #26287: Remane tryStartBundle to countNewElement.

ryucc opened a new pull request, #26287:
URL: https://github.com/apache/beam/pull/26287

   The current implementation tries to start the bundle, then adds an element to the new bundle. countNewElement is a better naming, because
   
   1. A new element is always added, but a bundle is not always started.
   2. The only usage of this method is in DoFnOp.processElement, it reads better to context.
   
   We can also add a startBundle interface in the future, but there are currently no use cases.
   
   The new name changes the subject to adding an element as a theme, and starting the bundle as a lazy side effect. It aligns more with what the method is actually doing.
   
   One symmetry we are breaking is in DoFnOp.processElement, we had a `tryStartBundle` paired with a `tryFinishBundle`. Instead now we have `countNewElement` paired with `tryFinishBundle`. I think the old reading favors 1 element per bundle, while the new reading favors multiple elements in a bundle, and we check if the bundle is full/should be finished after each add.
   
   Other changes:
   1. Renamed some tests.
   
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] xinyuiscool merged pull request #26287: Add JavaDoc to BundleManager

Posted by "xinyuiscool (via GitHub)" <gi...@apache.org>.
xinyuiscool merged PR #26287:
URL: https://github.com/apache/beam/pull/26287


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #26287: Remane tryStartBundle to countNewElement.

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #26287:
URL: https://github.com/apache/beam/pull/26287#discussion_r1167256752


##########
runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java:
##########
@@ -82,35 +82,20 @@ public void testTryStartBundleStartsBundle() {
     assertTrue("tryStartBundle() did not start the bundle", bundleManager.isBundleStarted());
   }
 
-  @Test
-  public void testTryStartBundleThrowsExceptionAndSignalError() {
+  @Test(expected = IllegalArgumentException.class)
+  public void testWhenCurrentBundleDoneFutureIsNotNullThenStartBundleFails() {
     bundleManager.setCurrentBundleDoneFuture(CompletableFuture.completedFuture(null));
-    try {
-      bundleManager.tryStartBundle();
-    } catch (IllegalArgumentException e) {
-      bundleManager.signalFailure(e);
-    }
-
-    // verify if the signal failure only resets appropriate attributes of bundle
-    verify(mockFutureCollector, times(1)).prepare();
-    verify(mockFutureCollector, times(1)).discard();
-    assertEquals(
-        "Expected the number of element in the current bundle to 0",
-        0L,
-        bundleManager.getCurrentBundleElementCount());
-    assertEquals(
-        "Expected pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount());
-    assertFalse("Error didn't reset the bundle as expected.", bundleManager.isBundleStarted());
+    bundleManager.countNewElement();

Review Comment:
   This removed block is already tested in `testTryStartBundleThrowsExceptionFromTheListener`. The only difference is the cause of `tryStartBundle` failure. I'm isolating this part to a new and smaller test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] xinyuiscool commented on pull request #26287: Add JavaDoc to BundleManager

Posted by "xinyuiscool (via GitHub)" <gi...@apache.org>.
xinyuiscool commented on PR #26287:
URL: https://github.com/apache/beam/pull/26287#issuecomment-1515557537

   The test failures are unrelated (downloading dependencies in mvn repo). Merge the pr since java precommit and other checks passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] mynameborat commented on pull request #26287: Remane tryStartBundle to countNewElement.

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on PR #26287:
URL: https://github.com/apache/beam/pull/26287#issuecomment-1511741941

   Overall, I feel the PR doesn't address/improve much rather introduces more confusion. Here are the reasons
   
   > A new element is always added, but a bundle is not always started.
   The name `tryStartBundle` already reflects this although in retrospection, I feel this isn't something we should necessarily cascade through names rather through API behavior. e.g., it should `start` with semantics calling out as idempotent behavior within a bundle while lifecycle start for every new bundle.
   
   Additionally, the symmetry of `tryFinishBundle` to `countNewElement` doesn't read well. What does it mean for the caller to invoke `tryFinishBundle` when there is nothing that conveys the caller to start one. 
   
   >The only usage of this method is in DoFnOp.processElement, it reads better to context.
   
   Not sure I agree with this as well. `DoFnOp.processElement` is a bridge between samza operator and beam ParDo and directly interacts with `PushbackSideInputDoFnRunner`. The sequence of `PushbackSideInputDoFnRunner` is `startBundle`, many `processElementInReadyWindows` and `finishBundle` which expects the callers to keep track of bundle boundaries and invoke these methods appropriately. 
   
   So it very much reads with the context of `tryStartBundle` --> `startBundle`, `processElementInReadyWindows` and `tryFinishBundle` --> `finishBundle`. 
   
   All said, I think there is scope for refactor and improvements as I see. e.g., 
   
   The existing bundle manager does two things 
   1. Handle bundle management (track bundles, invokes start and finish lifecycle method of the underlying runner)
   2. Process messages, handle watermark and process timers.
   
   It is the way it is because we don't wait until we create a complete bundle and delegate it to the underlying runner as and when we receive elements. Due to this behavior, 1 & 2 exists together. It is possible to separate them out and have the `BundleProcessor` which handles 2) and is capable of handling early delegation (handle uncommitted bundles) vs lazy delegation (end of a fully formed bundle) and potentially handle watermark and timers as well depending on which model it runs.
   
   All our use cases (classic & portability) uses the early delegation strategy and hence not critical to do the above mentioned refactor. There might be benefits in doing above if we plan to have multiple bundles in which case the bundle management might become a bit heavy and so does 2). 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on pull request #26287: Add JavaDoc to BundleManager

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on PR #26287:
URL: https://github.com/apache/beam/pull/26287#issuecomment-1512859973

   > The existing bundle manager does two things
   
   imo two things is too many things. "Process messages, handle watermark and process timers." also makes it 4 things instead of 2 things. My ideal BundleManager should only count the elements and look at the time window to decide if the bundle is closed or not. The code that handles the watermark can ask BundleManager the state of the bundle, but it should not be BundleManager. 
   
   Not sure how much we agree on this, but I'm not changing this part at the moment. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #26287: Remane tryStartBundle to countNewElement.

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #26287:
URL: https://github.com/apache/beam/pull/26287#discussion_r1167255993


##########
runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java:
##########
@@ -82,35 +82,20 @@ public void testTryStartBundleStartsBundle() {
     assertTrue("tryStartBundle() did not start the bundle", bundleManager.isBundleStarted());
   }
 
-  @Test
-  public void testTryStartBundleThrowsExceptionAndSignalError() {
+  @Test(expected = IllegalArgumentException.class)
+  public void testWhenCurrentBundleDoneFutureIsNotNullThenStartBundleFails() {
     bundleManager.setCurrentBundleDoneFuture(CompletableFuture.completedFuture(null));
-    try {
-      bundleManager.tryStartBundle();
-    } catch (IllegalArgumentException e) {
-      bundleManager.signalFailure(e);
-    }
-
-    // verify if the signal failure only resets appropriate attributes of bundle
-    verify(mockFutureCollector, times(1)).prepare();
-    verify(mockFutureCollector, times(1)).discard();
-    assertEquals(
-        "Expected the number of element in the current bundle to 0",
-        0L,
-        bundleManager.getCurrentBundleElementCount());
-    assertEquals(
-        "Expected pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount());
-    assertFalse("Error didn't reset the bundle as expected.", bundleManager.isBundleStarted());

Review Comment:
   This removed block is already tested in `testTryStartBundleThrowsExceptionFromTheListener`. The only difference is the cause of `tryStartBundle` failure. I'm isolating this part to a new and smaller test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26287: Remane tryStartBundle to countNewElement.

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26287:
URL: https://github.com/apache/beam/pull/26287#issuecomment-1509302057

   Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] xinyuiscool commented on a diff in pull request #26287: Add JavaDoc to BundleManager

Posted by "xinyuiscool (via GitHub)" <gi...@apache.org>.
xinyuiscool commented on code in PR #26287:
URL: https://github.com/apache/beam/pull/26287#discussion_r1171958069


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java:
##########
@@ -20,14 +20,39 @@
 import org.joda.time.Instant;
 
 public interface BundleManager<OutT> {

Review Comment:
   Add javadoc to the interface too. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on a diff in pull request #26287: Remane tryStartBundle to countNewElement.

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on code in PR #26287:
URL: https://github.com/apache/beam/pull/26287#discussion_r1167255993


##########
runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java:
##########
@@ -82,35 +82,20 @@ public void testTryStartBundleStartsBundle() {
     assertTrue("tryStartBundle() did not start the bundle", bundleManager.isBundleStarted());
   }
 
-  @Test
-  public void testTryStartBundleThrowsExceptionAndSignalError() {
+  @Test(expected = IllegalArgumentException.class)
+  public void testWhenCurrentBundleDoneFutureIsNotNullThenStartBundleFails() {
     bundleManager.setCurrentBundleDoneFuture(CompletableFuture.completedFuture(null));
-    try {
-      bundleManager.tryStartBundle();
-    } catch (IllegalArgumentException e) {
-      bundleManager.signalFailure(e);
-    }
-
-    // verify if the signal failure only resets appropriate attributes of bundle
-    verify(mockFutureCollector, times(1)).prepare();
-    verify(mockFutureCollector, times(1)).discard();
-    assertEquals(
-        "Expected the number of element in the current bundle to 0",
-        0L,
-        bundleManager.getCurrentBundleElementCount());
-    assertEquals(
-        "Expected pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount());
-    assertFalse("Error didn't reset the bundle as expected.", bundleManager.isBundleStarted());

Review Comment:
   This removed block is already tested in `testTryStartBundleThrowsExceptionFromTheListener`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] ryucc commented on pull request #26287: Add JavaDoc to BundleManager

Posted by "ryucc (via GitHub)" <gi...@apache.org>.
ryucc commented on PR #26287:
URL: https://github.com/apache/beam/pull/26287#issuecomment-1512852100

   @mynameborat
   > The name tryStartBundle already reflects this although in retrospection, 
   
   I don't see how `tryStartBundle` reflects adding an element to me.
   
   I would read `tryStartBundle` as if it hasn't started, start the bundle; else do nothing.
   
   Also having a bundle start with 1 element only makes half-sense to me. I like starting with 0, because when I count, I start counting from 0. It gives me security.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] alnzng commented on pull request #26287: Remane tryStartBundle to countNewElement.

Posted by "alnzng (via GitHub)" <gi...@apache.org>.
alnzng commented on PR #26287:
URL: https://github.com/apache/beam/pull/26287#issuecomment-1511843167

   +1 stick with the naming `tryStartBundle` because of the following:
   - Literally `BundleManager` is the place to manage the lifecycle of the bundle of events. It makes sense that it has a step/function to run `StartBundle` which maps to the DoFn's  `StartBundle`[1].
   - The current codes inside `tryStartBundle` does prepare for starting the bundle processing. For example, `bundleProgressListener.onBundleStarted()` actually invoke the underlying DoFn's `PushbackSideInputDoFnRunner.StartBundle` method. 
   - Other Beam runners (e.g. Flink) does use a similar naming to callout `StartBundle` in their logical "BundleManager"[2]. 
   -- Flink doesn't abstract its "BundleManger" into a separate class as how Samza does today, but the intention of related codes is similar.
   
   
   
   [1] https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/transforms/ParDo.html
   [2] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L674
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org