You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/04/29 06:04:42 UTC
[beam] branch master updated: Merge pull request #17443 from [BEAM-12164]: use the end timestamp for progress estimation in batch jobs
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7d42baa25ef Merge pull request #17443 from [BEAM-12164]: use the end timestamp for progress estimation in batch jobs
7d42baa25ef is described below
commit 7d42baa25ef39dc0efdc0a7c55eebc24c010b116
Author: Hengfeng Li <he...@google.com>
AuthorDate: Fri Apr 29 16:04:33 2022 +1000
Merge pull request #17443 from [BEAM-12164]: use the end timestamp for progress estimation in batch jobs
* [BEAM-12164]: use the end timestamp for progress estimation in batch jobs
* Use Timestamp.now() to update the throughput estimator.
* Fix the issue that range.getTo() should be Timestamp.MAX_VALUE for
the streaming case.
* Fix the unit test.
---
.../action/QueryChangeStreamAction.java | 3 +--
.../restriction/TimestampRangeTracker.java | 16 +++++++++---
.../restriction/TimestampRangeTrackerTest.java | 30 ++++++++++++++--------
3 files changed, 33 insertions(+), 16 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
index 96791572d00..57e3502fcc2 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -223,8 +223,7 @@ public class QueryChangeStreamAction {
// string representation of the record. Here, we only try to achieve an estimate
// instead of an accurate throughput.
this.throughputEstimator.update(
- record.getRecordTimestamp(),
- record.toString().getBytes(StandardCharsets.UTF_8).length);
+ Timestamp.now(), record.toString().getBytes(StandardCharsets.UTF_8).length);
if (maybeContinuation.isPresent()) {
LOG.debug("[" + token + "] Continuation present, returning " + maybeContinuation);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java
index d17d691cdc8..2642d902339 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java
@@ -204,7 +204,15 @@ public class TimestampRangeTracker extends RestrictionTracker<TimestampRange, Ti
*/
@Override
public Progress getProgress() {
- final BigDecimal now = BigDecimal.valueOf(timeSupplier.get().getSeconds());
+ BigDecimal end;
+ if (range.getTo().compareTo(Timestamp.MAX_VALUE) == 0) {
+ // When the given end timestamp equals to Timestamp.MAX_VALUE, this means that
+ // the end timestamp is not specified which should be a streaming job. So we
+ // use now() as the end timestamp.
+ end = BigDecimal.valueOf(timeSupplier.get().getSeconds());
+ } else {
+ end = BigDecimal.valueOf(range.getTo().getSeconds());
+ }
BigDecimal current;
if (lastClaimedPosition == null) {
current = BigDecimal.valueOf(range.getFrom().getSeconds());
@@ -213,13 +221,13 @@ public class TimestampRangeTracker extends RestrictionTracker<TimestampRange, Ti
}
// The remaining work must be greater than 0. Otherwise, it will cause an issue
// that the watermark does not advance.
- final BigDecimal workRemaining = now.subtract(current).max(BigDecimal.ONE);
+ final BigDecimal workRemaining = end.subtract(current).max(BigDecimal.ONE);
LOG.debug(
"Reported progress - current:"
+ current.doubleValue()
- + " now:"
- + now.doubleValue()
+ + " end:"
+ + end.doubleValue()
+ " workRemaining:"
+ workRemaining.doubleValue());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java
index 5a016164849..b57acab8e3e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java
@@ -232,13 +232,12 @@ public class TimestampRangeTrackerTest {
final TimestampRange range = TimestampRange.of(from, to);
final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
- tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(position.getSeconds() + 10, 0));
tracker.tryClaim(position);
final Progress progress = tracker.getProgress();
assertEquals(position.getSeconds(), progress.getWorkCompleted(), DELTA);
- assertEquals(10D, progress.getWorkRemaining(), DELTA);
+ assertEquals(to.getSeconds() - position.getSeconds(), progress.getWorkRemaining(), DELTA);
}
@Test
@@ -248,8 +247,6 @@ public class TimestampRangeTrackerTest {
final TimestampRange range = TimestampRange.of(from, to);
final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
- tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(to.getSeconds(), 0));
-
final Progress progress = tracker.getProgress();
assertEquals(0D, progress.getWorkCompleted(), DELTA);
assertEquals(to.getSeconds(), progress.getWorkRemaining(), DELTA);
@@ -263,8 +260,6 @@ public class TimestampRangeTrackerTest {
final TimestampRange range = TimestampRange.of(from, to);
final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
- tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(to.getSeconds(), 0));
-
tracker.tryClaim(position);
final Progress progress = tracker.getProgress();
@@ -275,13 +270,12 @@ public class TimestampRangeTrackerTest {
}
@Test
- public void testGetProgressReturnsWorkCompletedAsOneWhenRangeEndHasBeenAttempted() {
+ public void testGetProgressReturnsWorkCompletedWhenRangeEndHasBeenAttempted() {
final Timestamp from = Timestamp.ofTimeSecondsAndNanos(0, 0);
final Timestamp to = Timestamp.ofTimeSecondsAndNanos(101, 0);
final TimestampRange range = TimestampRange.of(from, to);
final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
- tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(to.getSeconds(), 0));
tracker.tryClaim(Timestamp.ofTimeSecondsAndNanos(100, 0));
tracker.tryClaim(Timestamp.ofTimeSecondsAndNanos(101, 0));
final Progress progress = tracker.getProgress();
@@ -293,14 +287,13 @@ public class TimestampRangeTrackerTest {
}
@Test
- public void testGetProgressReturnsWorkCompletedAsOneWhenPastRangeEndHasBeenAttempted() {
+ public void testGetProgressReturnsWorkCompletedWhenPastRangeEndHasBeenAttempted() {
final Timestamp from = Timestamp.ofTimeSecondsAndNanos(0, 0);
final Timestamp to = Timestamp.ofTimeSecondsAndNanos(101, 0);
final Timestamp position = Timestamp.ofTimeSecondsAndNanos(101, 0);
final TimestampRange range = TimestampRange.of(from, to);
final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
- tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(position.getSeconds(), 0));
tracker.tryClaim(position);
final Progress progress = tracker.getProgress();
@@ -309,4 +302,21 @@ public class TimestampRangeTrackerTest {
assertTrue(progress.getWorkRemaining() >= 0);
assertEquals(101D, progress.getWorkRemaining(), DELTA);
}
+
+ @Test
+ public void testGetProgressForStreaming() {
+ final Timestamp from = Timestamp.ofTimeSecondsAndNanos(0, 0);
+ final Timestamp position = Timestamp.ofTimeSecondsAndNanos(101, 0);
+ final TimestampRange range = TimestampRange.of(from, Timestamp.MAX_VALUE);
+ final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
+
+ tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(position.getSeconds() + 10, 0));
+ tracker.tryClaim(position);
+ final Progress progress = tracker.getProgress();
+
+ assertTrue(progress.getWorkCompleted() >= 0);
+ assertEquals(101D, progress.getWorkCompleted(), DELTA);
+ assertTrue(progress.getWorkRemaining() >= 0);
+ assertEquals(10D, progress.getWorkRemaining(), DELTA);
+ }
}