You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/10/10 19:04:46 UTC
[beam] branch release-2.42.0 updated: fix: only report backlog bytes on data records (#23493) (#23530)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch release-2.42.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.42.0 by this push:
new 926fb53afe2 fix: only report backlog bytes on data records (#23493) (#23530)
926fb53afe2 is described below
commit 926fb53afe27906861895270716d60e013b8960e
Author: nancyxu123 <na...@gmail.com>
AuthorDate: Mon Oct 10 12:04:37 2022 -0700
fix: only report backlog bytes on data records (#23493) (#23530)
Co-authored-by: Thiago Nunes <th...@google.com>
---
.../action/DataChangeRecordAction.java | 12 +++++-
.../action/QueryChangeStreamAction.java | 10 +----
.../action/DataChangeRecordActionTest.java | 12 +++++-
.../action/QueryChangeStreamActionTest.java | 45 ++++++++++++++++++----
.../dofn/ReadChangeStreamPartitionDoFnTest.java | 2 +-
5 files changed, 61 insertions(+), 20 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java
index 2446a6c9316..f806c7fcb74 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java
@@ -22,12 +22,14 @@ import java.util.Optional;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Utf8;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +68,8 @@ public class DataChangeRecordAction {
* org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn} SDF
* @param watermarkEstimator the watermark estimator of the {@link
* org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn} SDF
+ * @param throughputEstimator an estimator to calculate local throughput of the {@link
+ * org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn}.
* @return {@link Optional#empty()} if the caller can continue processing more records. A non
* empty {@link Optional} with {@link ProcessContinuation#stop()} if this function was unable
* to claim the {@link ChildPartitionsRecord} timestamp
@@ -76,7 +80,8 @@ public class DataChangeRecordAction {
DataChangeRecord record,
RestrictionTracker<TimestampRange, Timestamp> tracker,
OutputReceiver<DataChangeRecord> outputReceiver,
- ManualWatermarkEstimator<Instant> watermarkEstimator) {
+ ManualWatermarkEstimator<Instant> watermarkEstimator,
+ ThroughputEstimator throughputEstimator) {
final String token = partition.getPartitionToken();
LOG.debug("[" + token + "] Processing data record " + record.getCommitTimestamp());
@@ -91,6 +96,11 @@ public class DataChangeRecordAction {
outputReceiver.outputWithTimestamp(record, commitInstant);
watermarkEstimator.setWatermark(commitInstant);
+ // The size of a record is represented by the number of bytes needed for the
+ // string representation of the record. Here, we only try to achieve an estimate
+ // instead of an accurate throughput.
+ throughputEstimator.update(Timestamp.now(), Utf8.encodedLength(record.toString()));
+
LOG.debug("[" + token + "] Data record action completed successfully");
return Optional.empty();
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
index 6afb57c6e94..4265d1356ab 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerException;
-import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
@@ -189,7 +188,8 @@ public class QueryChangeStreamAction {
(DataChangeRecord) record,
tracker,
receiver,
- watermarkEstimator);
+ watermarkEstimator,
+ throughputEstimator);
} else if (record instanceof HeartbeatRecord) {
maybeContinuation =
heartbeatRecordAction.run(
@@ -203,12 +203,6 @@ public class QueryChangeStreamAction {
throw new IllegalArgumentException("Unknown record type " + record.getClass());
}
- // The size of a record is represented by the number of bytes needed for the
- // string representation of the record. Here, we only try to achieve an estimate
- // instead of an accurate throughput.
- this.throughputEstimator.update(
- Timestamp.now(), record.toString().getBytes(StandardCharsets.UTF_8).length);
-
if (maybeContinuation.isPresent()) {
LOG.debug("[" + token + "] Continuation present, returning " + maybeContinuation);
bundleFinalizer.afterBundleCommit(
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java
index bea3a670d3f..97cd7c647b9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -28,6 +29,7 @@ import com.google.cloud.Timestamp;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
@@ -44,6 +46,7 @@ public class DataChangeRecordActionTest {
private RestrictionTracker<TimestampRange, Timestamp> tracker;
private OutputReceiver<DataChangeRecord> outputReceiver;
private ManualWatermarkEstimator<Instant> watermarkEstimator;
+ private ThroughputEstimator throughputEstimator;
@Before
public void setUp() {
@@ -52,6 +55,7 @@ public class DataChangeRecordActionTest {
tracker = mock(RestrictionTracker.class);
outputReceiver = mock(OutputReceiver.class);
watermarkEstimator = mock(ManualWatermarkEstimator.class);
+ throughputEstimator = mock(ThroughputEstimator.class);
}
@Test
@@ -65,11 +69,13 @@ public class DataChangeRecordActionTest {
when(partition.getPartitionToken()).thenReturn(partitionToken);
final Optional<ProcessContinuation> maybeContinuation =
- action.run(partition, record, tracker, outputReceiver, watermarkEstimator);
+ action.run(
+ partition, record, tracker, outputReceiver, watermarkEstimator, throughputEstimator);
assertEquals(Optional.empty(), maybeContinuation);
verify(outputReceiver).outputWithTimestamp(record, instant);
verify(watermarkEstimator).setWatermark(instant);
+ verify(throughputEstimator).update(any(Timestamp.class), anyLong());
}
@Test
@@ -82,10 +88,12 @@ public class DataChangeRecordActionTest {
when(partition.getPartitionToken()).thenReturn(partitionToken);
final Optional<ProcessContinuation> maybeContinuation =
- action.run(partition, record, tracker, outputReceiver, watermarkEstimator);
+ action.run(
+ partition, record, tracker, outputReceiver, watermarkEstimator, throughputEstimator);
assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation);
verify(outputReceiver, never()).outputWithTimestamp(any(), any());
verify(watermarkEstimator, never()).setWatermark(any());
+ verify(throughputEstimator, never()).update(any(Timestamp.class), anyLong());
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
index 4f83aa5e827..1d9f7831061 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -148,10 +149,20 @@ public class QueryChangeStreamActionTest {
when(changeStreamRecordMapper.toChangeStreamRecords(partition, rowAsStruct, resultSetMetadata))
.thenReturn(Arrays.asList(record1, record2));
when(dataChangeRecordAction.run(
- partition, record1, restrictionTracker, outputReceiver, watermarkEstimator))
+ partition,
+ record1,
+ restrictionTracker,
+ outputReceiver,
+ watermarkEstimator,
+ throughputEstimator))
.thenReturn(Optional.empty());
when(dataChangeRecordAction.run(
- partition, record2, restrictionTracker, outputReceiver, watermarkEstimator))
+ partition,
+ record2,
+ restrictionTracker,
+ outputReceiver,
+ watermarkEstimator,
+ throughputEstimator))
.thenReturn(Optional.of(ProcessContinuation.stop()));
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
@@ -161,14 +172,28 @@ public class QueryChangeStreamActionTest {
assertEquals(ProcessContinuation.stop(), result);
verify(dataChangeRecordAction)
- .run(partition, record1, restrictionTracker, outputReceiver, watermarkEstimator);
+ .run(
+ partition,
+ record1,
+ restrictionTracker,
+ outputReceiver,
+ watermarkEstimator,
+ throughputEstimator);
verify(dataChangeRecordAction)
- .run(partition, record2, restrictionTracker, outputReceiver, watermarkEstimator);
+ .run(
+ partition,
+ record2,
+ restrictionTracker,
+ outputReceiver,
+ watermarkEstimator,
+ throughputEstimator);
verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any());
verify(restrictionTracker, never()).tryClaim(any());
+ // This is done in the DataChangeRecordAction, but not here
+ verify(throughputEstimator, never()).update(any(), anyLong());
}
@Test
@@ -207,9 +232,10 @@ public class QueryChangeStreamActionTest {
verify(heartbeatRecordAction).run(partition, record2, restrictionTracker, watermarkEstimator);
verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
- verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any());
+ verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any());
verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any());
verify(restrictionTracker, never()).tryClaim(any());
+ verify(throughputEstimator, never()).update(any(), anyLong());
}
@Test
@@ -252,9 +278,10 @@ public class QueryChangeStreamActionTest {
.run(partition, record2, restrictionTracker, watermarkEstimator);
verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
- verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any());
+ verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any());
verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
verify(restrictionTracker, never()).tryClaim(any());
+ verify(throughputEstimator, never()).update(any(), anyLong());
}
@Test
@@ -298,9 +325,10 @@ public class QueryChangeStreamActionTest {
.run(partition, record2, restrictionTracker, watermarkEstimator);
verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
- verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any());
+ verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any());
verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
verify(restrictionTracker, never()).tryClaim(any());
+ verify(throughputEstimator, never()).update(any(), anyLong());
}
@Test
@@ -324,9 +352,10 @@ public class QueryChangeStreamActionTest {
verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN);
- verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any());
+ verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any());
verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any());
+ verify(throughputEstimator, never()).update(any(), anyLong());
}
private static class BundleFinalizerStub implements BundleFinalizer {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
index 6c791b706f3..91f87c07154 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
@@ -149,7 +149,7 @@ public class ReadChangeStreamPartitionDoFnTest {
verify(queryChangeStreamAction)
.run(partition, tracker, receiver, watermarkEstimator, bundleFinalizer);
- verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any());
+ verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any());
verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any());
verify(tracker, never()).tryClaim(any());