You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/07/29 21:27:09 UTC

[GitHub] [beam] reuvenlax opened a new pull request #15249: [BEAM-12690] Fix GroupIntoBatches watermark maintenance

reuvenlax opened a new pull request #15249:
URL: https://github.com/apache/beam/pull/15249


   This bug has been observed by users of BigQueryIO. This PR modifies GroupIntoBatches to hold up the watermark while elements are being buffered.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15249: [BEAM-12690] Fix GroupIntoBatches watermark maintenance

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15249:
URL: https://github.com/apache/beam/pull/15249#issuecomment-890280652


   correct


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #15249: [BEAM-12690] Fix GroupIntoBatches watermark maintenance

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #15249:
URL: https://github.com/apache/beam/pull/15249#discussion_r684340809



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -340,6 +342,16 @@ public long getElementByteSize() {
     @StateId(NUM_BYTES_IN_BATCH_ID)
     private final StateSpec<CombiningState<Long, long[], Long>> batchSizeBytesSpec;
 
+    private static final String TIMER_TIMESTAMP = "timerTs";
+
+    @StateId(TIMER_TIMESTAMP)
+    private final StateSpec<ValueState<Long>> timerTsSpec;

Review comment:
       Done

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
##########
@@ -100,9 +100,9 @@
   Timer withOutputTimestamp(Instant outputTime);
 
   /**
-   * Returns the current relative time used by {@link #setRelative()} and {@link #offset}. This can
-   * be used by a client that self-manages relative timers (e.g. one that stores the current timer
-   * time in a state variable.
+   * * Returns the current relative time used by {@link #setRelative()} and {@link #offset}. This
+   * can be used by a client that self-manages relative timers (e.g. one that stores the current

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15249: [BEAM-12690] Fix GroupIntoBatches watermark maintenance

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15249:
URL: https://github.com/apache/beam/pull/15249#issuecomment-893133895


   friendly ping


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #15249: [BEAM-12690] Fix GroupIntoBatches watermark maintenance

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #15249:
URL: https://github.com/apache/beam/pull/15249#discussion_r683803943



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
##########
@@ -100,9 +100,9 @@
   Timer withOutputTimestamp(Instant outputTime);
 
   /**
-   * Returns the current relative time used by {@link #setRelative()} and {@link #offset}. This can
-   * be used by a client that self-manages relative timers (e.g. one that stores the current timer
-   * time in a state variable.
+   * * Returns the current relative time used by {@link #setRelative()} and {@link #offset}. This
+   * can be used by a client that self-manages relative timers (e.g. one that stores the current

Review comment:
       Seems like an unintended formatting change ?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -398,11 +424,31 @@ public void processElement(
         storedBatchSizeBytes.readLater();
       }
 
-      long num = storedBatchSize.read();
-      if (maxBufferingDuration.isLongerThan(Duration.ZERO) && num == 1) {
-        // This is the first element in batch. Start counting buffering time if a limit was set.
-        bufferingTimer.offset(maxBufferingDuration).setRelative();
+      long num;
+      if (maxBufferingDuration.isLongerThan(Duration.ZERO)) {
+        minBufferedTs.readLater();
+        num = storedBatchSize.read();
+
+        long oldOutputTs =
+            MoreObjects.firstNonNull(
+                minBufferedTs.read(), BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+        minBufferedTs.add(elementTs.getMillis());
+        // If this is the first element in the batch or if the timer's output timestamp needs
+        // modifying, then set a
+        // timer.
+        if (num == 1 || minBufferedTs.read() != oldOutputTs) {
+          long targetTs =
+              MoreObjects.firstNonNull(
+                  timerTs.read(),
+                  bufferingTimer.getCurrentRelativeTime().getMillis()

Review comment:
       I might be misunderstanding something but why can't we just set this to "current time" + "maxBufferingDuration" ?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -340,6 +342,16 @@ public long getElementByteSize() {
     @StateId(NUM_BYTES_IN_BATCH_ID)
     private final StateSpec<CombiningState<Long, long[], Long>> batchSizeBytesSpec;
 
+    private static final String TIMER_TIMESTAMP = "timerTs";
+
+    @StateId(TIMER_TIMESTAMP)
+    private final StateSpec<ValueState<Long>> timerTsSpec;

Review comment:
       Probably we should add comments clarifying what each of these state Ids are for.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #15249: [BEAM-12690] Fix GroupIntoBatches watermark maintenance

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #15249:
URL: https://github.com/apache/beam/pull/15249#issuecomment-894432430


   Thanks. LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15249: [BEAM-12690] Fix GroupIntoBatches watermark maintenance

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15249:
URL: https://github.com/apache/beam/pull/15249#issuecomment-891131998


    #15247 is now merged


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15249: [BEAM-12690] Fix GroupIntoBatches watermark maintenance

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15249:
URL: https://github.com/apache/beam/pull/15249#issuecomment-894437427


   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #15249: [BEAM-12690] Fix GroupIntoBatches watermark maintenance

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #15249:
URL: https://github.com/apache/beam/pull/15249#issuecomment-890220950


   Seems like this is blocker by https://github.com/apache/beam/pull/15247, right ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on a change in pull request #15249: [BEAM-12690] Fix GroupIntoBatches watermark maintenance

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on a change in pull request #15249:
URL: https://github.com/apache/beam/pull/15249#discussion_r684338356



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -398,11 +424,31 @@ public void processElement(
         storedBatchSizeBytes.readLater();
       }
 
-      long num = storedBatchSize.read();
-      if (maxBufferingDuration.isLongerThan(Duration.ZERO) && num == 1) {
-        // This is the first element in batch. Start counting buffering time if a limit was set.
-        bufferingTimer.offset(maxBufferingDuration).setRelative();
+      long num;
+      if (maxBufferingDuration.isLongerThan(Duration.ZERO)) {
+        minBufferedTs.readLater();
+        num = storedBatchSize.read();
+
+        long oldOutputTs =
+            MoreObjects.firstNonNull(
+                minBufferedTs.read(), BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+        minBufferedTs.add(elementTs.getMillis());
+        // If this is the first element in the batch or if the timer's output timestamp needs
+        // modifying, then set a
+        // timer.
+        if (num == 1 || minBufferedTs.read() != oldOutputTs) {
+          long targetTs =
+              MoreObjects.firstNonNull(
+                  timerTs.read(),
+                  bufferingTimer.getCurrentRelativeTime().getMillis()

Review comment:
       That is what's happening - getCurrentRelativeTime is basically current time. We don't want to query the OS for current time because then unit tests won't work. Unit tests (that use TestStream) inject a fake clock so that they can control the advancement of time, and that won't work if we query the OS itself.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15249: [BEAM-12690] Fix GroupIntoBatches watermark maintenance

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15249:
URL: https://github.com/apache/beam/pull/15249#issuecomment-893133895


   friendly ping


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax merged pull request #15249: [BEAM-12690] Fix GroupIntoBatches watermark maintenance

Posted by GitBox <gi...@apache.org>.
reuvenlax merged pull request #15249:
URL: https://github.com/apache/beam/pull/15249


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] reuvenlax commented on pull request #15249: [BEAM-12690] Fix GroupIntoBatches watermark maintenance

Posted by GitBox <gi...@apache.org>.
reuvenlax commented on pull request #15249:
URL: https://github.com/apache/beam/pull/15249#issuecomment-892011984


   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org