You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2023/01/05 22:04:45 UTC

[GitHub] [beam] reuvenlax commented on a diff in pull request #24853: Modify windmill DirectStreamObserver to call isReady only every 10 messages by default

reuvenlax commented on code in PR #24853:
URL: https://github.com/apache/beam/pull/24853#discussion_r1062933103


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java:
##########
@@ -41,80 +41,92 @@
   private static final Logger LOG = LoggerFactory.getLogger(DirectStreamObserver.class);
   private final Phaser phaser;
 
-  @GuardedBy("outboundObserver")
+  private final Object lock = new Object();
+
+  @GuardedBy("lock")
   private final CallStreamObserver<T> outboundObserver;
 
   private final long deadlineSeconds;
+  private final int messagesBetweenIsReadyChecks;
 
-  @GuardedBy("outboundObserver")
-  private boolean firstMessage = true;
+  @GuardedBy("lock")
+  private int messagesSinceReady = 0;
 
   public DirectStreamObserver(
-      Phaser phaser, CallStreamObserver<T> outboundObserver, long deadlineSeconds) {
+      Phaser phaser,
+      CallStreamObserver<T> outboundObserver,
+      long deadlineSeconds,
+      int messagesBetweenIsReadyChecks) {
     this.phaser = phaser;
     this.outboundObserver = outboundObserver;
     this.deadlineSeconds = deadlineSeconds;
+    // We always let the first message pass through without blocking because it is performed under
+    // the StreamPool synchronized block and single header message isn't going to cause memory
+    // issues due to excessive buffering within grpc.
+    this.messagesBetweenIsReadyChecks = Math.max(1, messagesBetweenIsReadyChecks);
   }
 
   @Override
   public void onNext(T value) {
-    final int phase = phaser.getPhase();
+    int phase = -1;
     long totalSecondsWaited = 0;
     long waitSeconds = 1;
-    while (true) {
-      try {
-        synchronized (outboundObserver) {
-          // We let the first message passthrough without blocking because it is performed under the
-          // StreamPool synchronized block and single message isn't going to cause memory issues due
-          // to excessive buffering within grpc.
-          if (firstMessage || outboundObserver.isReady()) {
-            firstMessage = false;
-            outboundObserver.onNext(value);
-            return;
+    synchronized (lock) {

Review Comment:
   what's the reason to move the lock to outside of the while loop?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org