You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/04/14 18:53:56 UTC

[beam] branch master updated: Merge pull request #17200 from [BEAM-12164]: fix the autoscaling backlog estimation for Spanner Change Streams Connector

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a33871e6b2 Merge pull request #17200 from [BEAM-12164]: fix the autoscaling backlog estimation for Spanner Change Streams Connector
8a33871e6b2 is described below

commit 8a33871e6b288db445fa1a0267d863a56eba0ae6
Author: Hengfeng Li <he...@google.com>
AuthorDate: Fri Apr 15 04:53:47 2022 +1000

    Merge pull request #17200 from [BEAM-12164]: fix the autoscaling backlog estimation for Spanner Change Streams Connector
    
    * fix: fix incorrect backlog of restriction tracker
    
    * Clean up the code of BEAM-14194.
    
    * Enable spanner change streams tests BEAM-14277.
    
    * Add a multiplier for the reported backlog size.
    
    * Use BigDecimal inside the throughput estimator.
---
 .../dataflow/DataflowPipelineTranslator.java       |  27 -----
 .../dataflow/DataflowPipelineTranslatorTest.java   |  67 -----------
 .../apache/beam/sdk/io/gcp/spanner/SpannerIO.java  |   5 +-
 .../changestreams/action/ActionFactory.java        |   8 +-
 .../action/QueryChangeStreamAction.java            |  17 ++-
 .../dofn/ReadChangeStreamPartitionDoFn.java        |  31 ++++-
 .../restriction/ThroughputEstimator.java           | 109 ++++++++++++++++++
 .../restriction/TimestampRangeTracker.java         |  67 ++++++-----
 .../action/QueryChangeStreamActionTest.java        |   6 +-
 .../dofn/ReadChangeStreamPartitionDoFnTest.java    |   9 +-
 .../changestreams/it/SpannerChangeStreamIT.java    |   2 -
 ...SpannerChangeStreamTransactionBoundariesIT.java |   2 -
 .../restriction/ThroughputEstimatorTest.java       | 126 +++++++++++++++++++++
 .../restriction/TimestampRangeTrackerTest.java     |  44 ++++---
 14 files changed, 364 insertions(+), 156 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 41134adc3d0..b46cedb4297 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -63,7 +63,6 @@ import org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory.ParDoSingle;
 import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
 import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType;
 import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.runners.dataflow.util.OutputReference;
@@ -74,7 +73,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.SpannerChangeStreamOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -407,8 +405,6 @@ public class DataflowPipelineTranslator {
         workerPool.setDiskSizeGb(options.getDiskSizeGb());
       }
       AutoscalingSettings settings = new AutoscalingSettings();
-      // TODO: Remove this once autoscaling is supported for SpannerIO.readChangeStream
-      assertSpannerChangeStreamsNoAutoScaling(options);
       if (options.getAutoscalingAlgorithm() != null) {
         settings.setAlgorithm(options.getAutoscalingAlgorithm().getAlgorithm());
       }
@@ -608,29 +604,6 @@ public class DataflowPipelineTranslator {
         return parents.peekFirst().toAppliedPTransform(getPipeline());
       }
     }
-
-    // TODO: Remove this once the autoscaling is supported for Spanner change streams
-    private void assertSpannerChangeStreamsNoAutoScaling(DataflowPipelineOptions options) {
-      if (isSpannerChangeStream(options) && !isAutoScalingAlgorithmNone(options)) {
-        throw new IllegalArgumentException(
-            "Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE.");
-      }
-    }
-
-    private boolean isSpannerChangeStream(DataflowPipelineOptions options) {
-      try {
-        final SpannerChangeStreamOptions spannerOptions =
-            options.as(SpannerChangeStreamOptions.class);
-        final String metadataTable = spannerOptions.getMetadataTable();
-        return metadataTable != null && !metadataTable.isEmpty();
-      } catch (Exception e) {
-        return false;
-      }
-    }
-
-    private boolean isAutoScalingAlgorithmNone(DataflowPipelineOptions options) {
-      return AutoscalingAlgorithmType.NONE.equals(options.getAutoscalingAlgorithm());
-    }
   }
 
   static class StepTranslator implements StepTranslationContext {
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 58ad2a49dde..82297b9784a 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -67,7 +67,6 @@ import org.apache.beam.runners.core.construction.SdkComponents;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType;
 import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.runners.dataflow.util.PropertyNames;
@@ -85,7 +84,6 @@ import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.SpannerChangeStreamOptions;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -428,71 +426,6 @@ public class DataflowPipelineTranslatorTest implements Serializable {
             .intValue());
   }
 
-  @Test
-  public void testSuccessWhenSpannerChangeStreamsAndAutoscalingEqualToNone() throws IOException {
-    final DataflowPipelineOptions options = buildPipelineOptions();
-    options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.NONE);
-    options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable");
-    final Pipeline p = buildPipeline(options);
-    final SdkComponents sdkComponents = createSdkComponents(options);
-    final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true);
-
-    final JobSpecification jobSpecification =
-        DataflowPipelineTranslator.fromOptions(options)
-            .translate(
-                p,
-                pipelineProto,
-                sdkComponents,
-                DataflowRunner.fromOptions(options),
-                Collections.emptyList());
-    assertNotNull(jobSpecification);
-  }
-
-  @Test
-  public void testExceptionIsThrownWhenSpannerChangeStreamsAndAutoscalingDifferentThanNone()
-      throws IOException {
-    final DataflowPipelineOptions options = buildPipelineOptions();
-    options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
-    options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable");
-    final Pipeline p = buildPipeline(options);
-    final SdkComponents sdkComponents = createSdkComponents(options);
-    final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(
-        "Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE");
-    DataflowPipelineTranslator.fromOptions(options)
-        .translate(
-            p,
-            pipelineProto,
-            sdkComponents,
-            DataflowRunner.fromOptions(options),
-            Collections.emptyList());
-  }
-
-  @Test
-  public void testExceptionIsThrownWhenSpannerChangeStreamsAndNoAutoscalingSpecified()
-      throws IOException {
-    final DataflowPipelineOptions options = buildPipelineOptions();
-    options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable");
-    final Pipeline p = buildPipeline(options);
-    final SdkComponents sdkComponents = createSdkComponents(options);
-    final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(
-        "Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE");
-    final JobSpecification jobSpecification =
-        DataflowPipelineTranslator.fromOptions(options)
-            .translate(
-                p,
-                pipelineProto,
-                sdkComponents,
-                DataflowRunner.fromOptions(options),
-                Collections.emptyList());
-    assertNotNull(jobSpecification);
-  }
-
   @Test
   public void testNumWorkersCannotExceedMaxNumWorkers() throws IOException {
     DataflowPipelineOptions options = buildPipelineOptions();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index 95af7ff912a..66a9ad1011c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -82,6 +82,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.PostProcessingMetri
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.MapperFactory;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.Metrics;
@@ -1595,6 +1596,7 @@ public class SpannerIO {
                 : getInclusiveEndAt();
         final MapperFactory mapperFactory = new MapperFactory();
         final ChangeStreamMetrics metrics = new ChangeStreamMetrics();
+        final ThroughputEstimator throughputEstimator = new ThroughputEstimator();
         final RpcPriority rpcPriority =
             MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH);
         final DaoFactory daoFactory =
@@ -1612,7 +1614,8 @@ public class SpannerIO {
         final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
             new DetectNewPartitionsDoFn(daoFactory, mapperFactory, actionFactory, metrics);
         final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
-            new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics);
+            new ReadChangeStreamPartitionDoFn(
+                daoFactory, mapperFactory, actionFactory, metrics, throughputEstimator);
         final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
             new PostProcessingMetricsDoFn(metrics);
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
index cf778343cb3..cca8506d0e6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
 import org.joda.time.Duration;
 
 /**
@@ -108,6 +109,7 @@ public class ActionFactory implements Serializable {
    * @param childPartitionsRecordAction action class to process {@link
    *     org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord}s
    * @param metrics metrics gathering class
+   * @param throughputEstimator an estimator to calculate local throughput.
    * @return single instance of the {@link QueryChangeStreamAction}
    */
   public synchronized QueryChangeStreamAction queryChangeStreamAction(
@@ -118,7 +120,8 @@ public class ActionFactory implements Serializable {
       DataChangeRecordAction dataChangeRecordAction,
       HeartbeatRecordAction heartbeatRecordAction,
       ChildPartitionsRecordAction childPartitionsRecordAction,
-      ChangeStreamMetrics metrics) {
+      ChangeStreamMetrics metrics,
+      ThroughputEstimator throughputEstimator) {
     if (queryChangeStreamActionInstance == null) {
       queryChangeStreamActionInstance =
           new QueryChangeStreamAction(
@@ -129,7 +132,8 @@ public class ActionFactory implements Serializable {
               dataChangeRecordAction,
               heartbeatRecordAction,
               childPartitionsRecordAction,
-              metrics);
+              metrics,
+              throughputEstimator);
     }
     return queryChangeStreamActionInstance;
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
index ceaa68b63e9..96791572d00 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -26,6 +26,7 @@ import io.opencensus.common.Scope;
 import io.opencensus.trace.AttributeValue;
 import io.opencensus.trace.Tracer;
 import io.opencensus.trace.Tracing;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Optional;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
@@ -39,6 +40,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRec
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
 import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
@@ -79,6 +81,7 @@ public class QueryChangeStreamAction {
   private final HeartbeatRecordAction heartbeatRecordAction;
   private final ChildPartitionsRecordAction childPartitionsRecordAction;
   private final ChangeStreamMetrics metrics;
+  private final ThroughputEstimator throughputEstimator;
 
   /**
    * Constructs an action class for performing a change stream query for a given partition.
@@ -93,6 +96,7 @@ public class QueryChangeStreamAction {
    * @param heartbeatRecordAction action class to process {@link HeartbeatRecord}s
    * @param childPartitionsRecordAction action class to process {@link ChildPartitionsRecord}s
    * @param metrics metrics gathering class
+   * @param throughputEstimator an estimator to calculate local throughput.
    */
   QueryChangeStreamAction(
       ChangeStreamDao changeStreamDao,
@@ -102,7 +106,8 @@ public class QueryChangeStreamAction {
       DataChangeRecordAction dataChangeRecordAction,
       HeartbeatRecordAction heartbeatRecordAction,
       ChildPartitionsRecordAction childPartitionsRecordAction,
-      ChangeStreamMetrics metrics) {
+      ChangeStreamMetrics metrics,
+      ThroughputEstimator throughputEstimator) {
     this.changeStreamDao = changeStreamDao;
     this.partitionMetadataDao = partitionMetadataDao;
     this.changeStreamRecordMapper = changeStreamRecordMapper;
@@ -111,6 +116,7 @@ public class QueryChangeStreamAction {
     this.heartbeatRecordAction = heartbeatRecordAction;
     this.childPartitionsRecordAction = childPartitionsRecordAction;
     this.metrics = metrics;
+    this.throughputEstimator = throughputEstimator;
   }
 
   /**
@@ -212,6 +218,14 @@ public class QueryChangeStreamAction {
               LOG.error("[" + token + "] Unknown record type " + record.getClass());
               throw new IllegalArgumentException("Unknown record type " + record.getClass());
             }
+
+            // The size of a record is represented by the number of bytes needed for the
+            // string representation of the record. Here, we only try to achieve an estimate
+            // instead of an accurate throughput.
+            this.throughputEstimator.update(
+                record.getRecordTimestamp(),
+                record.toString().getBytes(StandardCharsets.UTF_8).length);
+
             if (maybeContinuation.isPresent()) {
               LOG.debug("[" + token + "] Continuation present, returning " + maybeContinuation);
               bundleFinalizer.afterBundleCommit(
@@ -221,7 +235,6 @@ public class QueryChangeStreamAction {
             }
           }
         }
-
         bundleFinalizer.afterBundleCommit(
             Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
             updateWatermarkCallback(token, watermarkEstimator));
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
index aa44d397902..6c1ab8feaf9 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
@@ -24,6 +24,7 @@ import io.opencensus.trace.AttributeValue;
 import io.opencensus.trace.Tracer;
 import io.opencensus.trace.Tracing;
 import java.io.Serializable;
+import java.math.BigDecimal;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ChildPartitionsRecordAction;
@@ -39,6 +40,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadata
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampUtils;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -67,11 +69,13 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
   private static final long serialVersionUID = -7574596218085711975L;
   private static final Logger LOG = LoggerFactory.getLogger(ReadChangeStreamPartitionDoFn.class);
   private static final Tracer TRACER = Tracing.getTracer();
+  private static final double AUTOSCALING_SIZE_MULTIPLIER = 2.0D;
 
   private final DaoFactory daoFactory;
   private final MapperFactory mapperFactory;
   private final ActionFactory actionFactory;
   private final ChangeStreamMetrics metrics;
+  private final ThroughputEstimator throughputEstimator;
 
   private transient QueryChangeStreamAction queryChangeStreamAction;
 
@@ -88,16 +92,19 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
    * @param mapperFactory the {@link MapperFactory} to construct {@link ChangeStreamRecordMapper}s
    * @param actionFactory the {@link ActionFactory} to construct actions
    * @param metrics the {@link ChangeStreamMetrics} to emit partition related metrics
+   * @param throughputEstimator an estimator to calculate local throughput.
    */
   public ReadChangeStreamPartitionDoFn(
       DaoFactory daoFactory,
       MapperFactory mapperFactory,
       ActionFactory actionFactory,
-      ChangeStreamMetrics metrics) {
+      ChangeStreamMetrics metrics,
+      ThroughputEstimator throughputEstimator) {
     this.daoFactory = daoFactory;
     this.mapperFactory = mapperFactory;
     this.actionFactory = actionFactory;
     this.metrics = metrics;
+    this.throughputEstimator = throughputEstimator;
   }
 
   @GetInitialWatermarkEstimatorState
@@ -146,6 +153,25 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
     return TimestampRange.of(startTimestamp, endTimestamp);
   }
 
+  @GetSize
+  public double getSize(@Element PartitionMetadata partition, @Restriction TimestampRange range)
+      throws Exception {
+    final BigDecimal timeGapInSeconds =
+        BigDecimal.valueOf(newTracker(partition, range).getProgress().getWorkRemaining());
+    final BigDecimal throughput = BigDecimal.valueOf(this.throughputEstimator.get());
+    LOG.debug(
+        "Reported getSize() - remaining work: " + timeGapInSeconds + " throughput:" + throughput);
+    // Cap it at Double.MAX_VALUE to avoid an overflow.
+    return timeGapInSeconds
+        .multiply(throughput)
+        // The multiplier is required because the job tries to reach the minimum number of workers
+        // and this leads to a very high cpu utilization. The multiplier would increase the reported
+        // size and help to reduce the cpu usage. In the future, this can become a custom parameter.
+        .multiply(BigDecimal.valueOf(AUTOSCALING_SIZE_MULTIPLIER))
+        .min(BigDecimal.valueOf(Double.MAX_VALUE))
+        .doubleValue();
+  }
+
   @NewTracker
   public ReadChangeStreamPartitionRangeTracker newTracker(
       @Element PartitionMetadata partition, @Restriction TimestampRange range) {
@@ -180,7 +206,8 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
             dataChangeRecordAction,
             heartbeatRecordAction,
             childPartitionsRecordAction,
-            metrics);
+            metrics,
+            throughputEstimator);
   }
 
   /**
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java
new file mode 100644
index 00000000000..513e742dbde
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import com.google.cloud.Timestamp;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.ImmutablePair;
+
+/** An estimator to provide an estimate on the throughput of the outputted elements. */
+public class ThroughputEstimator implements Serializable {
+
+  private static final long serialVersionUID = -3597929310338724800L;
+  // The start time of each per-second window.
+  private Timestamp startTimeOfCurrentWindow;
+  // The bytes of the current window.
+  private BigDecimal bytesInCurrentWindow;
+  // The number of seconds to look in the past.
+  private final int numOfSeconds = 60;
+  // The total bytes of all windows in the queue.
+  private BigDecimal bytesInQueue;
+  // The queue holds a number of windows in the past in order to calculate
+  // a rolling windowing throughput.
+  private final Queue<ImmutablePair<Timestamp, BigDecimal>> queue;
+
+  public ThroughputEstimator() {
+    queue = new ArrayDeque<>();
+    startTimeOfCurrentWindow = Timestamp.MIN_VALUE;
+    bytesInCurrentWindow = BigDecimal.valueOf(0L);
+    bytesInQueue = BigDecimal.valueOf(0L);
+  }
+
+  /**
+   * Updates the estimator with the bytes of records.
+   *
+   * @param timeOfRecords the committed timestamp of the records
+   * @param bytes the total bytes of the records
+   */
+  public void update(Timestamp timeOfRecords, long bytes) {
+    BigDecimal bytesNum = BigDecimal.valueOf(bytes);
+    if (startTimeOfCurrentWindow.equals(Timestamp.MIN_VALUE)) {
+      bytesInCurrentWindow = bytesNum;
+      startTimeOfCurrentWindow = timeOfRecords;
+      return;
+    }
+
+    if (timeOfRecords.getSeconds() < startTimeOfCurrentWindow.getSeconds() + 1) {
+      bytesInCurrentWindow = bytesInCurrentWindow.add(bytesNum);
+    } else {
+      queue.add(new ImmutablePair<>(startTimeOfCurrentWindow, bytesInCurrentWindow));
+      bytesInQueue = bytesInQueue.add(bytesInCurrentWindow);
+
+      bytesInCurrentWindow = bytesNum;
+      startTimeOfCurrentWindow = timeOfRecords;
+    }
+    cleanQueue(startTimeOfCurrentWindow);
+  }
+
+  /** Returns the estimated throughput for now. */
+  public double get() {
+    return getFrom(Timestamp.now());
+  }
+
+  /**
+   * Returns the estimated throughput for a specified time.
+   *
+   * @param time the specified timestamp to check throughput
+   */
+  public double getFrom(Timestamp time) {
+    cleanQueue(time);
+    if (queue.size() == 0) {
+      return 0D;
+    }
+    return bytesInQueue
+        .divide(BigDecimal.valueOf(queue.size()), MathContext.DECIMAL128)
+        .doubleValue();
+  }
+
+  private void cleanQueue(Timestamp time) {
+    while (queue.size() != 0) {
+      ImmutablePair<Timestamp, BigDecimal> peek = queue.peek();
+      if (peek != null && peek.getLeft().getSeconds() >= time.getSeconds() - numOfSeconds) {
+        break;
+      }
+      // Remove the element if the timestamp of the first element is beyond
+      // the time range to look backward.
+      ImmutablePair<Timestamp, BigDecimal> pair = queue.remove();
+      bytesInQueue = bytesInQueue.subtract(pair.getRight());
+    }
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java
index 8931c66c3c7..d17d691cdc8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java
@@ -27,10 +27,14 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 
 import com.google.cloud.Timestamp;
 import java.math.BigDecimal;
+import java.util.function.Supplier;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
 import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A {@link RestrictionTracker} for claiming positions in a {@link TimestampRange} in a
@@ -45,12 +49,20 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 public class TimestampRangeTracker extends RestrictionTracker<TimestampRange, Timestamp>
     implements HasProgress {
 
+  private static final Logger LOG = LoggerFactory.getLogger(TimestampRangeTracker.class);
   protected TimestampRange range;
   protected @Nullable Timestamp lastAttemptedPosition;
   protected @Nullable Timestamp lastClaimedPosition;
+  protected Supplier<Timestamp> timeSupplier;
 
   public TimestampRangeTracker(TimestampRange range) {
     this.range = checkNotNull(range);
+    this.timeSupplier = () -> Timestamp.now();
+  }
+
+  @VisibleForTesting
+  public void setTimeSupplier(Supplier<Timestamp> timeSupplier) {
+    this.timeSupplier = timeSupplier;
   }
 
   /**
@@ -182,43 +194,36 @@ public class TimestampRangeTracker extends RestrictionTracker<TimestampRange, Ti
   }
 
   /**
-   * Returns the progress made within the restriction so far. This progress is returned in a
-   * normalized fashion from the interval [0, 1]. Zero means no work indicates no work (completed or
-   * remaining), while 1 indicates all work (completed or remaining).
+   * Returns the progress made within the restriction so far. If lastAttemptedPosition is null, the
+   * start of the range is used as the completed work; otherwise, lastAttemptedPosition will be
+   * used. The time gap between lastAttemptedPosition and now is used as the remaining work. In this
+   * way, when the time gap becomes large, we will have more backlog to process and we should add
+   * more resources.
    *
-   * <p>If no position was attempted, it will return {@code workCompleted} as 0 and {@code
-   * workRemaining} as 1. If a position was attempted, it will return the fraction of work completed
-   * and work remaining based on the offset the position represents in the restriction range. If the
-   * last position attempted was greater than the end of the restriction range, it will return
-   * {@code workCompleted} as 1 and {@code workRemaining} as 0.
-   *
-   * @return work completed and work remaining as fractions between [0, 1]
+   * @return work completed and work remaining in seconds.
    */
   @Override
   public Progress getProgress() {
-    final BigDecimal fromInNanos = toNanos(range.getFrom());
-    final BigDecimal toInNanos = toNanos(range.getTo());
-    final BigDecimal totalWork = toInNanos.subtract(fromInNanos, DECIMAL128);
-
-    if (lastAttemptedPosition == null) {
-      final double workCompleted = 0D;
-      final double workRemaining = 1D;
-
-      return Progress.from(workCompleted, workRemaining);
+    final BigDecimal now = BigDecimal.valueOf(timeSupplier.get().getSeconds());
+    BigDecimal current;
+    if (lastClaimedPosition == null) {
+      current = BigDecimal.valueOf(range.getFrom().getSeconds());
     } else {
-      final BigDecimal currentInNanos = toNanos(lastAttemptedPosition);
-      final BigDecimal workRemainingInNanos =
-          toInNanos.subtract(currentInNanos, DECIMAL128).max(BigDecimal.ZERO);
-
-      final double workCompleted =
-          totalWork
-              .subtract(workRemainingInNanos, DECIMAL128)
-              .divide(totalWork, DECIMAL128)
-              .doubleValue();
-      final double workRemaining = workRemainingInNanos.divide(totalWork, DECIMAL128).doubleValue();
-
-      return Progress.from(workCompleted, workRemaining);
+      current = BigDecimal.valueOf(lastClaimedPosition.getSeconds());
     }
+    // The remaining work must be greater than 0. Otherwise, it will cause an issue
+    // that the watermark does not advance.
+    final BigDecimal workRemaining = now.subtract(current).max(BigDecimal.ONE);
+
+    LOG.debug(
+        "Reported progress - current:"
+            + current.doubleValue()
+            + " now:"
+            + now.doubleValue()
+            + " workRemaining:"
+            + workRemaining.doubleValue());
+
+    return Progress.from(current.doubleValue(), workRemaining.doubleValue());
   }
 
   @Override
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
index 38e00e519ac..4f83aa5e827 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRec
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
 import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
@@ -64,6 +65,7 @@ public class QueryChangeStreamActionTest {
   private PartitionMetadataDao partitionMetadataDao;
   private PartitionMetadata partition;
   private ChangeStreamMetrics metrics;
+  private ThroughputEstimator throughputEstimator;
   private TimestampRange restriction;
   private RestrictionTracker<TimestampRange, Timestamp> restrictionTracker;
   private OutputReceiver<DataChangeRecord> outputReceiver;
@@ -86,6 +88,7 @@ public class QueryChangeStreamActionTest {
     heartbeatRecordAction = mock(HeartbeatRecordAction.class);
     childPartitionsRecordAction = mock(ChildPartitionsRecordAction.class);
     metrics = mock(ChangeStreamMetrics.class);
+    throughputEstimator = mock(ThroughputEstimator.class);
 
     action =
         new QueryChangeStreamAction(
@@ -96,7 +99,8 @@ public class QueryChangeStreamActionTest {
             dataChangeRecordAction,
             heartbeatRecordAction,
             childPartitionsRecordAction,
-            metrics);
+            metrics,
+            throughputEstimator);
     final Struct row = mock(Struct.class);
     partition =
         PartitionMetadata.newBuilder()
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
index 6aa5acb493a..6c791b706f3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.MapperFactory;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
 import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
@@ -79,6 +80,7 @@ public class ReadChangeStreamPartitionDoFnTest {
     final DaoFactory daoFactory = mock(DaoFactory.class);
     final MapperFactory mapperFactory = mock(MapperFactory.class);
     final ChangeStreamMetrics metrics = mock(ChangeStreamMetrics.class);
+    final ThroughputEstimator throughputEstimator = mock(ThroughputEstimator.class);
     final ActionFactory actionFactory = mock(ActionFactory.class);
     final PartitionMetadataDao partitionMetadataDao = mock(PartitionMetadataDao.class);
     final ChangeStreamDao changeStreamDao = mock(ChangeStreamDao.class);
@@ -89,7 +91,9 @@ public class ReadChangeStreamPartitionDoFnTest {
     childPartitionsRecordAction = mock(ChildPartitionsRecordAction.class);
     queryChangeStreamAction = mock(QueryChangeStreamAction.class);
 
-    doFn = new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics);
+    doFn =
+        new ReadChangeStreamPartitionDoFn(
+            daoFactory, mapperFactory, actionFactory, metrics, throughputEstimator);
 
     partition =
         PartitionMetadata.newBuilder()
@@ -126,7 +130,8 @@ public class ReadChangeStreamPartitionDoFnTest {
             dataChangeRecordAction,
             heartbeatRecordAction,
             childPartitionsRecordAction,
-            metrics))
+            metrics,
+            throughputEstimator))
         .thenReturn(queryChangeStreamAction);
 
     doFn.setup();
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java
index 786ca52b3d0..202179bd915 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java
@@ -47,7 +47,6 @@ import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -85,7 +84,6 @@ public class SpannerChangeStreamIT {
     pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class).setBlockOnRun(false);
   }
 
-  @Ignore("BEAM-14277 Sickbay until autoscaling changes are merged")
   @Test
   public void testReadSpannerChangeStream() {
     // Defines how many rows are going to be inserted / updated / deleted in the test
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java
index f1b6fbc3098..eb5b9e3ba15 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java
@@ -49,7 +49,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Instant;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -84,7 +83,6 @@ public class SpannerChangeStreamTransactionBoundariesIT {
     databaseClient = ENV.getDatabaseClient();
   }
 
-  @Ignore("BEAM-14277 Sickbay until autoscaling changes are merged")
   @Test
   public void testTransactionBoundaries() {
     LOG.info("Test pipeline: " + pipeline.toString());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java
new file mode 100644
index 00000000000..1c1282dd446
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.Timestamp;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.ImmutablePair;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ThroughputEstimatorTest {
+  private static final double DELTA = 1e-10;
+  private ThroughputEstimator estimator;
+
+  @Before
+  public void setup() {
+    estimator = new ThroughputEstimator();
+  }
+
+  @Test
+  public void testThroughputCalculation() {
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(20, 0), 10);
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(30, 0), 20);
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(59, 0), 30);
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(60, 0), 40); // Exclusive
+    assertEquals(20D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(61, 0)), DELTA);
+
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(100, 0), 10);
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(110, 0), 20);
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(110, 0), 10);
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(140, 0), 40); // Exclusive
+    assertEquals(20D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(141, 0)), DELTA);
+
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(201, 0), 10);
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(250, 0), 40); // Exclusive
+    assertEquals(10D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(261, 0)), DELTA);
+
+    assertEquals(0D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(350, 0)), DELTA);
+  }
+
+  @Test
+  public void testThroughputIsAccumulatedWithin60SecondsWindow() {
+    List<ImmutablePair<Timestamp, Long>> pairs = generateTestData(100, 0, 60, Long.MAX_VALUE);
+    pairs.sort((a, b) -> a.getLeft().compareTo(b.getLeft()));
+
+    final long count = pairs.stream().map(ImmutablePair::getLeft).distinct().count();
+    BigDecimal sum = BigDecimal.valueOf(0L);
+    for (ImmutablePair<Timestamp, Long> pair : pairs) {
+      sum = sum.add(BigDecimal.valueOf(pair.getRight()));
+    }
+    final BigDecimal want = sum.divide(BigDecimal.valueOf(count), MathContext.DECIMAL128);
+
+    for (int i = 0; i < pairs.size(); i++) {
+      estimator.update(pairs.get(i).getLeft(), pairs.get(i).getRight());
+    }
+
+    // This is needed to push the current window into the queue.
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(60, 0), 10);
+    double actual = estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(60, 0));
+    assertEquals(want.doubleValue(), actual, DELTA);
+  }
+
+  @Test
+  public void testThroughputIsAccumulatedWithin300SecondsWindow() {
+    List<ImmutablePair<Timestamp, Long>> excludedPairs =
+        generateTestData(300, 0, 240, Long.MAX_VALUE);
+    List<ImmutablePair<Timestamp, Long>> expectedPairs =
+        generateTestData(50, 240, 300, Long.MAX_VALUE);
+    List<ImmutablePair<Timestamp, Long>> pairs =
+        Stream.concat(excludedPairs.stream(), expectedPairs.stream()).collect(Collectors.toList());
+    pairs.sort((a, b) -> a.getLeft().compareTo(b.getLeft()));
+
+    final long count = expectedPairs.stream().map(ImmutablePair::getLeft).distinct().count();
+    BigDecimal sum = BigDecimal.valueOf(0L);
+    for (ImmutablePair<Timestamp, Long> pair : expectedPairs) {
+      sum = sum.add(BigDecimal.valueOf(pair.getRight()));
+    }
+    final BigDecimal want = sum.divide(BigDecimal.valueOf(count), MathContext.DECIMAL128);
+    for (int i = 0; i < pairs.size(); i++) {
+      estimator.update(pairs.get(i).getLeft(), pairs.get(i).getRight());
+    }
+
+    // This is needed to push the current window into the queue.
+    estimator.update(Timestamp.ofTimeSecondsAndNanos(300, 0), 10);
+    double actual = estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(300, 0));
+    assertEquals(want.doubleValue(), actual, DELTA);
+  }
+
+  private List<ImmutablePair<Timestamp, Long>> generateTestData(
+      int size, int startSeconds, int endSeconds, long maxBytes) {
+    Random random = new Random();
+    List<ImmutablePair<Timestamp, Long>> pairs = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      int seconds = random.nextInt(endSeconds - startSeconds) + startSeconds;
+      pairs.add(
+          new ImmutablePair<>(
+              Timestamp.ofTimeSecondsAndNanos(seconds, 0),
+              ThreadLocalRandom.current().nextLong(maxBytes)));
+    }
+    return pairs;
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java
index f8a879fb690..5a016164849 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java
@@ -41,7 +41,7 @@ import org.junit.runner.RunWith;
 
 @RunWith(JUnitQuickcheck.class)
 public class TimestampRangeTrackerTest {
-  private static final double DELTA = 1e-15;
+  private static final double DELTA = 1e-10;
 
   @Property
   public void testTryClaimReturnsTrueWhenPositionIsWithinTheRange(
@@ -221,82 +221,92 @@ public class TimestampRangeTrackerTest {
   }
 
   @Property
-  public void testGetProgressWorkCompletedAndWorkRemainingEqualsToOne(
+  public void testGetProgressWorkCompletedAndWorkRemaining(
       @From(TimestampGenerator.class) Timestamp from,
       @From(TimestampGenerator.class) Timestamp to,
       @From(TimestampGenerator.class) Timestamp position) {
+    assumeThat(from, greaterThanOrEqualTo(Timestamp.ofTimeSecondsAndNanos(0, 0)));
     assumeThat(from, lessThanOrEqualTo(to));
     assumeThat(position, greaterThanOrEqualTo(from));
     assumeThat(position, lessThan(to));
 
     final TimestampRange range = TimestampRange.of(from, to);
     final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
+    tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(position.getSeconds() + 10, 0));
 
     tracker.tryClaim(position);
     final Progress progress = tracker.getProgress();
 
-    assertEquals(1D, progress.getWorkCompleted() + progress.getWorkRemaining(), DELTA);
+    assertEquals(position.getSeconds(), progress.getWorkCompleted(), DELTA);
+    assertEquals(10D, progress.getWorkRemaining(), DELTA);
   }
 
   @Test
   public void testGetProgressReturnsWorkRemainingAsWholeRangeWhenNoClaimWasAttempted() {
-    final Timestamp from = Timestamp.MIN_VALUE;
+    final Timestamp from = Timestamp.ofTimeSecondsAndNanos(0, 0);
     final Timestamp to = Timestamp.now();
     final TimestampRange range = TimestampRange.of(from, to);
     final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
 
+    tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(to.getSeconds(), 0));
+
     final Progress progress = tracker.getProgress();
     assertEquals(0D, progress.getWorkCompleted(), DELTA);
-    assertEquals(1D, progress.getWorkRemaining(), DELTA);
+    assertEquals(to.getSeconds(), progress.getWorkRemaining(), DELTA);
   }
 
   @Test
   public void testGetProgressReturnsWorkRemainingAsRangeEndMinusAttemptedPosition() {
     final Timestamp from = Timestamp.ofTimeSecondsAndNanos(0, 0);
-    final Timestamp to = Timestamp.ofTimeSecondsAndNanos(0, 100);
-    final Timestamp position = Timestamp.ofTimeSecondsAndNanos(0, 30);
+    final Timestamp to = Timestamp.ofTimeSecondsAndNanos(100, 0);
+    final Timestamp position = Timestamp.ofTimeSecondsAndNanos(30, 0);
     final TimestampRange range = TimestampRange.of(from, to);
     final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
 
+    tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(to.getSeconds(), 0));
+
     tracker.tryClaim(position);
     final Progress progress = tracker.getProgress();
 
     assertTrue(progress.getWorkCompleted() >= 0);
-    assertEquals(0.3D, progress.getWorkCompleted(), DELTA);
+    assertEquals(30D, progress.getWorkCompleted(), DELTA);
     assertTrue(progress.getWorkRemaining() >= 0);
-    assertEquals(0.7D, progress.getWorkRemaining(), DELTA);
+    assertEquals(70D, progress.getWorkRemaining(), DELTA);
   }
 
   @Test
   public void testGetProgressReturnsWorkCompletedAsOneWhenRangeEndHasBeenAttempted() {
     final Timestamp from = Timestamp.ofTimeSecondsAndNanos(0, 0);
-    final Timestamp to = Timestamp.ofTimeSecondsAndNanos(0, 100);
+    final Timestamp to = Timestamp.ofTimeSecondsAndNanos(101, 0);
     final TimestampRange range = TimestampRange.of(from, to);
     final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
 
-    tracker.tryClaim(to);
+    tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(to.getSeconds(), 0));
+    tracker.tryClaim(Timestamp.ofTimeSecondsAndNanos(100, 0));
+    tracker.tryClaim(Timestamp.ofTimeSecondsAndNanos(101, 0));
     final Progress progress = tracker.getProgress();
 
     assertTrue(progress.getWorkCompleted() >= 0);
-    assertEquals(1D, progress.getWorkCompleted(), DELTA);
+    assertEquals(100D, progress.getWorkCompleted(), DELTA);
     assertTrue(progress.getWorkRemaining() >= 0);
-    assertEquals(0D, progress.getWorkRemaining(), DELTA);
+    assertEquals(1D, progress.getWorkRemaining(), DELTA);
   }
 
   @Test
   public void testGetProgressReturnsWorkCompletedAsOneWhenPastRangeEndHasBeenAttempted() {
     final Timestamp from = Timestamp.ofTimeSecondsAndNanos(0, 0);
-    final Timestamp to = Timestamp.ofTimeSecondsAndNanos(0, 100);
-    final Timestamp position = Timestamp.ofTimeSecondsAndNanos(0, 101);
+    final Timestamp to = Timestamp.ofTimeSecondsAndNanos(101, 0);
+    final Timestamp position = Timestamp.ofTimeSecondsAndNanos(101, 0);
     final TimestampRange range = TimestampRange.of(from, to);
     final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
 
+    tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(position.getSeconds(), 0));
     tracker.tryClaim(position);
     final Progress progress = tracker.getProgress();
 
     assertTrue(progress.getWorkCompleted() >= 0);
-    assertEquals(1D, progress.getWorkCompleted(), DELTA);
+    assertEquals(0D, progress.getWorkCompleted(), DELTA);
     assertTrue(progress.getWorkRemaining() >= 0);
-    assertEquals(0D, progress.getWorkRemaining(), DELTA);
+    assertEquals(101D, progress.getWorkRemaining(), DELTA);
   }
 }