You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2023/01/02 13:15:32 UTC

[GitHub] [beam] scwhittle opened a new pull request, #24853: Modify windmill DirectStreamObserver to call isReady only every 10 messages by default

scwhittle opened a new pull request, #24853:
URL: https://github.com/apache/beam/pull/24853

   This provides more output buffering which ensures that output is not throttled on synchronization when message sizes exceed 32KB grpc isready limit.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax merged pull request #24853: Modify windmill DirectStreamObserver to call isReady only every 10 messages by default

Posted by GitBox <gi...@apache.org>.
reuvenlax merged PR #24853:
URL: https://github.com/apache/beam/pull/24853


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on pull request #24853: Modify windmill DirectStreamObserver to call isReady only every 10 messages by default

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on PR #24853:
URL: https://github.com/apache/beam/pull/24853#issuecomment-1374027977

   Run Dataflow Streaming ValidatesRunner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on pull request #24853: Modify windmill DirectStreamObserver to call isReady only every 10 messages by default

Posted by GitBox <gi...@apache.org>.
scwhittle commented on PR #24853:
URL: https://github.com/apache/beam/pull/24853#issuecomment-1369008460

    This is motivated by excessive onReady overhead observed in #24836


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on pull request #24853: Modify windmill DirectStreamObserver to call isReady only every 10 messages by default

Posted by GitBox <gi...@apache.org>.
scwhittle commented on PR #24853:
URL: https://github.com/apache/beam/pull/24853#issuecomment-1369007871

   R: @reuvenlax


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #24853: Modify windmill DirectStreamObserver to call isReady only every 10 messages by default

Posted by GitBox <gi...@apache.org>.
scwhittle commented on code in PR #24853:
URL: https://github.com/apache/beam/pull/24853#discussion_r1063253469


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java:
##########
@@ -41,80 +41,92 @@
   private static final Logger LOG = LoggerFactory.getLogger(DirectStreamObserver.class);
   private final Phaser phaser;
 
-  @GuardedBy("outboundObserver")
+  private final Object lock = new Object();
+
+  @GuardedBy("lock")
   private final CallStreamObserver<T> outboundObserver;
 
   private final long deadlineSeconds;
+  private final int messagesBetweenIsReadyChecks;
 
-  @GuardedBy("outboundObserver")
-  private boolean firstMessage = true;
+  @GuardedBy("lock")
+  private int messagesSinceReady = 0;
 
   public DirectStreamObserver(
-      Phaser phaser, CallStreamObserver<T> outboundObserver, long deadlineSeconds) {
+      Phaser phaser,
+      CallStreamObserver<T> outboundObserver,
+      long deadlineSeconds,
+      int messagesBetweenIsReadyChecks) {
     this.phaser = phaser;
     this.outboundObserver = outboundObserver;
     this.deadlineSeconds = deadlineSeconds;
+    // We always let the first message pass through without blocking because it is performed under
+    // the StreamPool synchronized block and single header message isn't going to cause memory
+    // issues due to excessive buffering within grpc.
+    this.messagesBetweenIsReadyChecks = Math.max(1, messagesBetweenIsReadyChecks);
   }
 
   @Override
   public void onNext(T value) {
-    final int phase = phaser.getPhase();
+    int phase = -1;
     long totalSecondsWaited = 0;
     long waitSeconds = 1;
-    while (true) {
-      try {
-        synchronized (outboundObserver) {
-          // We let the first message passthrough without blocking because it is performed under the
-          // StreamPool synchronized block and single message isn't going to cause memory issues due
-          // to excessive buffering within grpc.
-          if (firstMessage || outboundObserver.isReady()) {
-            firstMessage = false;
-            outboundObserver.onNext(value);
-            return;
+    synchronized (lock) {

Review Comment:
   was thinking it would reduce grabbing/releasing, since other callers will be waiting on phaser anyway
   
   But on second thought, that only reduces lock/unlock on timeouts and it could have weird interactions wiating on phaser and blocking onError/onCompleted.  Changing it back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on a diff in pull request #24853: Modify windmill DirectStreamObserver to call isReady only every 10 messages by default

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on code in PR #24853:
URL: https://github.com/apache/beam/pull/24853#discussion_r1062933103


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java:
##########
@@ -41,80 +41,92 @@
   private static final Logger LOG = LoggerFactory.getLogger(DirectStreamObserver.class);
   private final Phaser phaser;
 
-  @GuardedBy("outboundObserver")
+  private final Object lock = new Object();
+
+  @GuardedBy("lock")
   private final CallStreamObserver<T> outboundObserver;
 
   private final long deadlineSeconds;
+  private final int messagesBetweenIsReadyChecks;
 
-  @GuardedBy("outboundObserver")
-  private boolean firstMessage = true;
+  @GuardedBy("lock")
+  private int messagesSinceReady = 0;
 
   public DirectStreamObserver(
-      Phaser phaser, CallStreamObserver<T> outboundObserver, long deadlineSeconds) {
+      Phaser phaser,
+      CallStreamObserver<T> outboundObserver,
+      long deadlineSeconds,
+      int messagesBetweenIsReadyChecks) {
     this.phaser = phaser;
     this.outboundObserver = outboundObserver;
     this.deadlineSeconds = deadlineSeconds;
+    // We always let the first message pass through without blocking because it is performed under
+    // the StreamPool synchronized block and single header message isn't going to cause memory
+    // issues due to excessive buffering within grpc.
+    this.messagesBetweenIsReadyChecks = Math.max(1, messagesBetweenIsReadyChecks);
   }
 
   @Override
   public void onNext(T value) {
-    final int phase = phaser.getPhase();
+    int phase = -1;
     long totalSecondsWaited = 0;
     long waitSeconds = 1;
-    while (true) {
-      try {
-        synchronized (outboundObserver) {
-          // We let the first message passthrough without blocking because it is performed under the
-          // StreamPool synchronized block and single message isn't going to cause memory issues due
-          // to excessive buffering within grpc.
-          if (firstMessage || outboundObserver.isReady()) {
-            firstMessage = false;
-            outboundObserver.onNext(value);
-            return;
+    synchronized (lock) {

Review Comment:
   what's the reason to move the lock to outside of the while loop?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] reuvenlax commented on pull request #24853: Modify windmill DirectStreamObserver to call isReady only every 10 messages by default

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on PR #24853:
URL: https://github.com/apache/beam/pull/24853#issuecomment-1374020380

   Run Dataflow ValidatesRunnerStreaming


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #24853: Modify windmill DirectStreamObserver to call isReady only every 10 messages by default

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #24853:
URL: https://github.com/apache/beam/pull/24853#issuecomment-1369009134

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org