You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2023/01/06 09:01:15 UTC

[GitHub] [beam] scwhittle commented on a diff in pull request #24853: Modify windmill DirectStreamObserver to call isReady only every 10 messages by default

scwhittle commented on code in PR #24853:
URL: https://github.com/apache/beam/pull/24853#discussion_r1063253469


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java:
##########
@@ -41,80 +41,92 @@
   private static final Logger LOG = LoggerFactory.getLogger(DirectStreamObserver.class);
   private final Phaser phaser;
 
-  @GuardedBy("outboundObserver")
+  private final Object lock = new Object();
+
+  @GuardedBy("lock")
   private final CallStreamObserver<T> outboundObserver;
 
   private final long deadlineSeconds;
+  private final int messagesBetweenIsReadyChecks;
 
-  @GuardedBy("outboundObserver")
-  private boolean firstMessage = true;
+  @GuardedBy("lock")
+  private int messagesSinceReady = 0;
 
   public DirectStreamObserver(
-      Phaser phaser, CallStreamObserver<T> outboundObserver, long deadlineSeconds) {
+      Phaser phaser,
+      CallStreamObserver<T> outboundObserver,
+      long deadlineSeconds,
+      int messagesBetweenIsReadyChecks) {
     this.phaser = phaser;
     this.outboundObserver = outboundObserver;
     this.deadlineSeconds = deadlineSeconds;
+    // We always let the first message pass through without blocking because it is performed under
+    // the StreamPool synchronized block and single header message isn't going to cause memory
+    // issues due to excessive buffering within grpc.
+    this.messagesBetweenIsReadyChecks = Math.max(1, messagesBetweenIsReadyChecks);
   }
 
   @Override
   public void onNext(T value) {
-    final int phase = phaser.getPhase();
+    int phase = -1;
     long totalSecondsWaited = 0;
     long waitSeconds = 1;
-    while (true) {
-      try {
-        synchronized (outboundObserver) {
-          // We let the first message passthrough without blocking because it is performed under the
-          // StreamPool synchronized block and single message isn't going to cause memory issues due
-          // to excessive buffering within grpc.
-          if (firstMessage || outboundObserver.isReady()) {
-            firstMessage = false;
-            outboundObserver.onNext(value);
-            return;
+    synchronized (lock) {

Review Comment:
   was thinking it would reduce grabbing/releasing, since other callers will be waiting on phaser anyway
   
   But on second thought, that only reduces lock/unlock on timeouts and it could have weird interactions wiating on phaser and blocking onError/onCompleted.  Changing it back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org