You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/10/10 19:04:46 UTC

[beam] branch release-2.42.0 updated: fix: only report backlog bytes on data records (#23493) (#23530)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch release-2.42.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.42.0 by this push:
     new 926fb53afe2 fix: only report backlog bytes on data records (#23493) (#23530)
926fb53afe2 is described below

commit 926fb53afe27906861895270716d60e013b8960e
Author: nancyxu123 <na...@gmail.com>
AuthorDate: Mon Oct 10 12:04:37 2022 -0700

    fix: only report backlog bytes on data records (#23493) (#23530)
    
    Co-authored-by: Thiago Nunes <th...@google.com>
---
 .../action/DataChangeRecordAction.java             | 12 +++++-
 .../action/QueryChangeStreamAction.java            | 10 +----
 .../action/DataChangeRecordActionTest.java         | 12 +++++-
 .../action/QueryChangeStreamActionTest.java        | 45 ++++++++++++++++++----
 .../dofn/ReadChangeStreamPartitionDoFnTest.java    |  2 +-
 5 files changed, 61 insertions(+), 20 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java
index 2446a6c9316..f806c7fcb74 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordAction.java
@@ -22,12 +22,14 @@ import java.util.Optional;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Utf8;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,6 +68,8 @@ public class DataChangeRecordAction {
    *     org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn} SDF
    * @param watermarkEstimator the watermark estimator of the {@link
    *     org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn} SDF
+   * @param throughputEstimator an estimator to calculate local throughput of the {@link
+   *     org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn}.
    * @return {@link Optional#empty()} if the caller can continue processing more records. A non
    *     empty {@link Optional} with {@link ProcessContinuation#stop()} if this function was unable
    *     to claim the {@link ChildPartitionsRecord} timestamp
@@ -76,7 +80,8 @@ public class DataChangeRecordAction {
       DataChangeRecord record,
       RestrictionTracker<TimestampRange, Timestamp> tracker,
       OutputReceiver<DataChangeRecord> outputReceiver,
-      ManualWatermarkEstimator<Instant> watermarkEstimator) {
+      ManualWatermarkEstimator<Instant> watermarkEstimator,
+      ThroughputEstimator throughputEstimator) {
 
     final String token = partition.getPartitionToken();
     LOG.debug("[" + token + "] Processing data record " + record.getCommitTimestamp());
@@ -91,6 +96,11 @@ public class DataChangeRecordAction {
     outputReceiver.outputWithTimestamp(record, commitInstant);
     watermarkEstimator.setWatermark(commitInstant);
 
+    // The size of a record is represented by the number of bytes needed for the
+    // string representation of the record. Here, we only try to achieve an estimate
+    // instead of an accurate throughput.
+    throughputEstimator.update(Timestamp.now(), Utf8.encodedLength(record.toString()));
+
     LOG.debug("[" + token + "] Data record action completed successfully");
     return Optional.empty();
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
index 6afb57c6e94..4265d1356ab 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
 import com.google.cloud.Timestamp;
 import com.google.cloud.spanner.ErrorCode;
 import com.google.cloud.spanner.SpannerException;
-import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Optional;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
@@ -189,7 +188,8 @@ public class QueryChangeStreamAction {
                     (DataChangeRecord) record,
                     tracker,
                     receiver,
-                    watermarkEstimator);
+                    watermarkEstimator,
+                    throughputEstimator);
           } else if (record instanceof HeartbeatRecord) {
             maybeContinuation =
                 heartbeatRecordAction.run(
@@ -203,12 +203,6 @@ public class QueryChangeStreamAction {
             throw new IllegalArgumentException("Unknown record type " + record.getClass());
           }
 
-          // The size of a record is represented by the number of bytes needed for the
-          // string representation of the record. Here, we only try to achieve an estimate
-          // instead of an accurate throughput.
-          this.throughputEstimator.update(
-              Timestamp.now(), record.toString().getBytes(StandardCharsets.UTF_8).length);
-
           if (maybeContinuation.isPresent()) {
             LOG.debug("[" + token + "] Continuation present, returning " + maybeContinuation);
             bundleFinalizer.afterBundleCommit(
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java
index bea3a670d3f..97cd7c647b9 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -28,6 +29,7 @@ import com.google.cloud.Timestamp;
 import java.util.Optional;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
@@ -44,6 +46,7 @@ public class DataChangeRecordActionTest {
   private RestrictionTracker<TimestampRange, Timestamp> tracker;
   private OutputReceiver<DataChangeRecord> outputReceiver;
   private ManualWatermarkEstimator<Instant> watermarkEstimator;
+  private ThroughputEstimator throughputEstimator;
 
   @Before
   public void setUp() {
@@ -52,6 +55,7 @@ public class DataChangeRecordActionTest {
     tracker = mock(RestrictionTracker.class);
     outputReceiver = mock(OutputReceiver.class);
     watermarkEstimator = mock(ManualWatermarkEstimator.class);
+    throughputEstimator = mock(ThroughputEstimator.class);
   }
 
   @Test
@@ -65,11 +69,13 @@ public class DataChangeRecordActionTest {
     when(partition.getPartitionToken()).thenReturn(partitionToken);
 
     final Optional<ProcessContinuation> maybeContinuation =
-        action.run(partition, record, tracker, outputReceiver, watermarkEstimator);
+        action.run(
+            partition, record, tracker, outputReceiver, watermarkEstimator, throughputEstimator);
 
     assertEquals(Optional.empty(), maybeContinuation);
     verify(outputReceiver).outputWithTimestamp(record, instant);
     verify(watermarkEstimator).setWatermark(instant);
+    verify(throughputEstimator).update(any(Timestamp.class), anyLong());
   }
 
   @Test
@@ -82,10 +88,12 @@ public class DataChangeRecordActionTest {
     when(partition.getPartitionToken()).thenReturn(partitionToken);
 
     final Optional<ProcessContinuation> maybeContinuation =
-        action.run(partition, record, tracker, outputReceiver, watermarkEstimator);
+        action.run(
+            partition, record, tracker, outputReceiver, watermarkEstimator, throughputEstimator);
 
     assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation);
     verify(outputReceiver, never()).outputWithTimestamp(any(), any());
     verify(watermarkEstimator, never()).setWatermark(any());
+    verify(throughputEstimator, never()).update(any(Timestamp.class), anyLong());
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
index 4f83aa5e827..1d9f7831061 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
 import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -148,10 +149,20 @@ public class QueryChangeStreamActionTest {
     when(changeStreamRecordMapper.toChangeStreamRecords(partition, rowAsStruct, resultSetMetadata))
         .thenReturn(Arrays.asList(record1, record2));
     when(dataChangeRecordAction.run(
-            partition, record1, restrictionTracker, outputReceiver, watermarkEstimator))
+            partition,
+            record1,
+            restrictionTracker,
+            outputReceiver,
+            watermarkEstimator,
+            throughputEstimator))
         .thenReturn(Optional.empty());
     when(dataChangeRecordAction.run(
-            partition, record2, restrictionTracker, outputReceiver, watermarkEstimator))
+            partition,
+            record2,
+            restrictionTracker,
+            outputReceiver,
+            watermarkEstimator,
+            throughputEstimator))
         .thenReturn(Optional.of(ProcessContinuation.stop()));
     when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
 
@@ -161,14 +172,28 @@ public class QueryChangeStreamActionTest {
 
     assertEquals(ProcessContinuation.stop(), result);
     verify(dataChangeRecordAction)
-        .run(partition, record1, restrictionTracker, outputReceiver, watermarkEstimator);
+        .run(
+            partition,
+            record1,
+            restrictionTracker,
+            outputReceiver,
+            watermarkEstimator,
+            throughputEstimator);
     verify(dataChangeRecordAction)
-        .run(partition, record2, restrictionTracker, outputReceiver, watermarkEstimator);
+        .run(
+            partition,
+            record2,
+            restrictionTracker,
+            outputReceiver,
+            watermarkEstimator,
+            throughputEstimator);
     verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
 
     verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any());
     verify(restrictionTracker, never()).tryClaim(any());
+    // This is done in the DataChangeRecordAction, but not here
+    verify(throughputEstimator, never()).update(any(), anyLong());
   }
 
   @Test
@@ -207,9 +232,10 @@ public class QueryChangeStreamActionTest {
     verify(heartbeatRecordAction).run(partition, record2, restrictionTracker, watermarkEstimator);
     verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
 
-    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any());
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any());
     verify(restrictionTracker, never()).tryClaim(any());
+    verify(throughputEstimator, never()).update(any(), anyLong());
   }
 
   @Test
@@ -252,9 +278,10 @@ public class QueryChangeStreamActionTest {
         .run(partition, record2, restrictionTracker, watermarkEstimator);
     verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
 
-    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any());
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any());
     verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
     verify(restrictionTracker, never()).tryClaim(any());
+    verify(throughputEstimator, never()).update(any(), anyLong());
   }
 
   @Test
@@ -298,9 +325,10 @@ public class QueryChangeStreamActionTest {
         .run(partition, record2, restrictionTracker, watermarkEstimator);
     verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
 
-    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any());
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any());
     verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
     verify(restrictionTracker, never()).tryClaim(any());
+    verify(throughputEstimator, never()).update(any(), anyLong());
   }
 
   @Test
@@ -324,9 +352,10 @@ public class QueryChangeStreamActionTest {
     verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
     verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN);
 
-    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any());
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any());
     verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any());
+    verify(throughputEstimator, never()).update(any(), anyLong());
   }
 
   private static class BundleFinalizerStub implements BundleFinalizer {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
index 6c791b706f3..91f87c07154 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
@@ -149,7 +149,7 @@ public class ReadChangeStreamPartitionDoFnTest {
     verify(queryChangeStreamAction)
         .run(partition, tracker, receiver, watermarkEstimator, bundleFinalizer);
 
-    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any());
+    verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any(), any());
     verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
     verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any());
     verify(tracker, never()).tryClaim(any());