You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/04/29 06:04:42 UTC

[beam] branch master updated: Merge pull request #17443 from [BEAM-12164]: use the end timestamp for progress estimation in batch jobs

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d42baa25ef Merge pull request #17443 from [BEAM-12164]: use the end timestamp for progress estimation in batch jobs
7d42baa25ef is described below

commit 7d42baa25ef39dc0efdc0a7c55eebc24c010b116
Author: Hengfeng Li <he...@google.com>
AuthorDate: Fri Apr 29 16:04:33 2022 +1000

    Merge pull request #17443 from [BEAM-12164]: use the end timestamp for progress estimation in batch jobs
    
    * [BEAM-12164]: use the end timestamp for progress estimation in batch jobs
    
    * Use Timestamp.now() to update the throughput estimator.
    
    * Fix the issue that range.getTo() should be Timestamp.MAX_VALUE for
    the streaming case.
    
    * Fix the unit test.
---
 .../action/QueryChangeStreamAction.java            |  3 +--
 .../restriction/TimestampRangeTracker.java         | 16 +++++++++---
 .../restriction/TimestampRangeTrackerTest.java     | 30 ++++++++++++++--------
 3 files changed, 33 insertions(+), 16 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
index 96791572d00..57e3502fcc2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -223,8 +223,7 @@ public class QueryChangeStreamAction {
             // string representation of the record. Here, we only try to achieve an estimate
             // instead of an accurate throughput.
             this.throughputEstimator.update(
-                record.getRecordTimestamp(),
-                record.toString().getBytes(StandardCharsets.UTF_8).length);
+                Timestamp.now(), record.toString().getBytes(StandardCharsets.UTF_8).length);
 
             if (maybeContinuation.isPresent()) {
               LOG.debug("[" + token + "] Continuation present, returning " + maybeContinuation);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java
index d17d691cdc8..2642d902339 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java
@@ -204,7 +204,15 @@ public class TimestampRangeTracker extends RestrictionTracker<TimestampRange, Ti
    */
   @Override
   public Progress getProgress() {
-    final BigDecimal now = BigDecimal.valueOf(timeSupplier.get().getSeconds());
+    BigDecimal end;
+    if (range.getTo().compareTo(Timestamp.MAX_VALUE) == 0) {
+      // When the given end timestamp equals to Timestamp.MAX_VALUE, this means that
+      // the end timestamp is not specified which should be a streaming job. So we
+      // use now() as the end timestamp.
+      end = BigDecimal.valueOf(timeSupplier.get().getSeconds());
+    } else {
+      end = BigDecimal.valueOf(range.getTo().getSeconds());
+    }
     BigDecimal current;
     if (lastClaimedPosition == null) {
       current = BigDecimal.valueOf(range.getFrom().getSeconds());
@@ -213,13 +221,13 @@ public class TimestampRangeTracker extends RestrictionTracker<TimestampRange, Ti
     }
     // The remaining work must be greater than 0. Otherwise, it will cause an issue
     // that the watermark does not advance.
-    final BigDecimal workRemaining = now.subtract(current).max(BigDecimal.ONE);
+    final BigDecimal workRemaining = end.subtract(current).max(BigDecimal.ONE);
 
     LOG.debug(
         "Reported progress - current:"
             + current.doubleValue()
-            + " now:"
-            + now.doubleValue()
+            + " end:"
+            + end.doubleValue()
             + " workRemaining:"
             + workRemaining.doubleValue());
 
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java
index 5a016164849..b57acab8e3e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java
@@ -232,13 +232,12 @@ public class TimestampRangeTrackerTest {
 
     final TimestampRange range = TimestampRange.of(from, to);
     final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
-    tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(position.getSeconds() + 10, 0));
 
     tracker.tryClaim(position);
     final Progress progress = tracker.getProgress();
 
     assertEquals(position.getSeconds(), progress.getWorkCompleted(), DELTA);
-    assertEquals(10D, progress.getWorkRemaining(), DELTA);
+    assertEquals(to.getSeconds() - position.getSeconds(), progress.getWorkRemaining(), DELTA);
   }
 
   @Test
@@ -248,8 +247,6 @@ public class TimestampRangeTrackerTest {
     final TimestampRange range = TimestampRange.of(from, to);
     final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
 
-    tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(to.getSeconds(), 0));
-
     final Progress progress = tracker.getProgress();
     assertEquals(0D, progress.getWorkCompleted(), DELTA);
     assertEquals(to.getSeconds(), progress.getWorkRemaining(), DELTA);
@@ -263,8 +260,6 @@ public class TimestampRangeTrackerTest {
     final TimestampRange range = TimestampRange.of(from, to);
     final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
 
-    tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(to.getSeconds(), 0));
-
     tracker.tryClaim(position);
     final Progress progress = tracker.getProgress();
 
@@ -275,13 +270,12 @@ public class TimestampRangeTrackerTest {
   }
 
   @Test
-  public void testGetProgressReturnsWorkCompletedAsOneWhenRangeEndHasBeenAttempted() {
+  public void testGetProgressReturnsWorkCompletedWhenRangeEndHasBeenAttempted() {
     final Timestamp from = Timestamp.ofTimeSecondsAndNanos(0, 0);
     final Timestamp to = Timestamp.ofTimeSecondsAndNanos(101, 0);
     final TimestampRange range = TimestampRange.of(from, to);
     final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
 
-    tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(to.getSeconds(), 0));
     tracker.tryClaim(Timestamp.ofTimeSecondsAndNanos(100, 0));
     tracker.tryClaim(Timestamp.ofTimeSecondsAndNanos(101, 0));
     final Progress progress = tracker.getProgress();
@@ -293,14 +287,13 @@ public class TimestampRangeTrackerTest {
   }
 
   @Test
-  public void testGetProgressReturnsWorkCompletedAsOneWhenPastRangeEndHasBeenAttempted() {
+  public void testGetProgressReturnsWorkCompletedWhenPastRangeEndHasBeenAttempted() {
     final Timestamp from = Timestamp.ofTimeSecondsAndNanos(0, 0);
     final Timestamp to = Timestamp.ofTimeSecondsAndNanos(101, 0);
     final Timestamp position = Timestamp.ofTimeSecondsAndNanos(101, 0);
     final TimestampRange range = TimestampRange.of(from, to);
     final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
 
-    tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(position.getSeconds(), 0));
     tracker.tryClaim(position);
     final Progress progress = tracker.getProgress();
 
@@ -309,4 +302,21 @@ public class TimestampRangeTrackerTest {
     assertTrue(progress.getWorkRemaining() >= 0);
     assertEquals(101D, progress.getWorkRemaining(), DELTA);
   }
+
+  @Test
+  public void testGetProgressForStreaming() {
+    final Timestamp from = Timestamp.ofTimeSecondsAndNanos(0, 0);
+    final Timestamp position = Timestamp.ofTimeSecondsAndNanos(101, 0);
+    final TimestampRange range = TimestampRange.of(from, Timestamp.MAX_VALUE);
+    final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
+
+    tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(position.getSeconds() + 10, 0));
+    tracker.tryClaim(position);
+    final Progress progress = tracker.getProgress();
+
+    assertTrue(progress.getWorkCompleted() >= 0);
+    assertEquals(101D, progress.getWorkCompleted(), DELTA);
+    assertTrue(progress.getWorkRemaining() >= 0);
+    assertEquals(10D, progress.getWorkRemaining(), DELTA);
+  }
 }