You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/04/14 18:53:56 UTC
[beam] branch master updated: Merge pull request #17200 from [BEAM-12164]: fix the autoscaling backlog estimation for Spanner Change Streams Connector
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8a33871e6b2 Merge pull request #17200 from [BEAM-12164]: fix the autoscaling backlog estimation for Spanner Change Streams Connector
8a33871e6b2 is described below
commit 8a33871e6b288db445fa1a0267d863a56eba0ae6
Author: Hengfeng Li <he...@google.com>
AuthorDate: Fri Apr 15 04:53:47 2022 +1000
Merge pull request #17200 from [BEAM-12164]: fix the autoscaling backlog estimation for Spanner Change Streams Connector
* fix: fix incorrect backlog of restriction tracker
* Clean up the code of BEAM-14194.
* Enable spanner change streams tests BEAM-14277.
* Add a multiplier for the reported backlog size.
* Use BigDecimal inside the throughput estimator.
---
.../dataflow/DataflowPipelineTranslator.java | 27 -----
.../dataflow/DataflowPipelineTranslatorTest.java | 67 -----------
.../apache/beam/sdk/io/gcp/spanner/SpannerIO.java | 5 +-
.../changestreams/action/ActionFactory.java | 8 +-
.../action/QueryChangeStreamAction.java | 17 ++-
.../dofn/ReadChangeStreamPartitionDoFn.java | 31 ++++-
.../restriction/ThroughputEstimator.java | 109 ++++++++++++++++++
.../restriction/TimestampRangeTracker.java | 67 ++++++-----
.../action/QueryChangeStreamActionTest.java | 6 +-
.../dofn/ReadChangeStreamPartitionDoFnTest.java | 9 +-
.../changestreams/it/SpannerChangeStreamIT.java | 2 -
...SpannerChangeStreamTransactionBoundariesIT.java | 2 -
.../restriction/ThroughputEstimatorTest.java | 126 +++++++++++++++++++++
.../restriction/TimestampRangeTrackerTest.java | 44 ++++---
14 files changed, 364 insertions(+), 156 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 41134adc3d0..b46cedb4297 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -63,7 +63,6 @@ import org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory.ParDoSingle;
import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.runners.dataflow.util.OutputReference;
@@ -74,7 +73,6 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.SpannerChangeStreamOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -407,8 +405,6 @@ public class DataflowPipelineTranslator {
workerPool.setDiskSizeGb(options.getDiskSizeGb());
}
AutoscalingSettings settings = new AutoscalingSettings();
- // TODO: Remove this once autoscaling is supported for SpannerIO.readChangeStream
- assertSpannerChangeStreamsNoAutoScaling(options);
if (options.getAutoscalingAlgorithm() != null) {
settings.setAlgorithm(options.getAutoscalingAlgorithm().getAlgorithm());
}
@@ -608,29 +604,6 @@ public class DataflowPipelineTranslator {
return parents.peekFirst().toAppliedPTransform(getPipeline());
}
}
-
- // TODO: Remove this once the autoscaling is supported for Spanner change streams
- private void assertSpannerChangeStreamsNoAutoScaling(DataflowPipelineOptions options) {
- if (isSpannerChangeStream(options) && !isAutoScalingAlgorithmNone(options)) {
- throw new IllegalArgumentException(
- "Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE.");
- }
- }
-
- private boolean isSpannerChangeStream(DataflowPipelineOptions options) {
- try {
- final SpannerChangeStreamOptions spannerOptions =
- options.as(SpannerChangeStreamOptions.class);
- final String metadataTable = spannerOptions.getMetadataTable();
- return metadataTable != null && !metadataTable.isEmpty();
- } catch (Exception e) {
- return false;
- }
- }
-
- private boolean isAutoScalingAlgorithmNone(DataflowPipelineOptions options) {
- return AutoscalingAlgorithmType.NONE.equals(options.getAutoscalingAlgorithm());
- }
}
static class StepTranslator implements StepTranslationContext {
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 58ad2a49dde..82297b9784a 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -67,7 +67,6 @@ import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.runners.dataflow.util.PropertyNames;
@@ -85,7 +84,6 @@ import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.gcp.spanner.SpannerIO.SpannerChangeStreamOptions;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -428,71 +426,6 @@ public class DataflowPipelineTranslatorTest implements Serializable {
.intValue());
}
- @Test
- public void testSuccessWhenSpannerChangeStreamsAndAutoscalingEqualToNone() throws IOException {
- final DataflowPipelineOptions options = buildPipelineOptions();
- options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.NONE);
- options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable");
- final Pipeline p = buildPipeline(options);
- final SdkComponents sdkComponents = createSdkComponents(options);
- final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true);
-
- final JobSpecification jobSpecification =
- DataflowPipelineTranslator.fromOptions(options)
- .translate(
- p,
- pipelineProto,
- sdkComponents,
- DataflowRunner.fromOptions(options),
- Collections.emptyList());
- assertNotNull(jobSpecification);
- }
-
- @Test
- public void testExceptionIsThrownWhenSpannerChangeStreamsAndAutoscalingDifferentThanNone()
- throws IOException {
- final DataflowPipelineOptions options = buildPipelineOptions();
- options.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
- options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable");
- final Pipeline p = buildPipeline(options);
- final SdkComponents sdkComponents = createSdkComponents(options);
- final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true);
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(
- "Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE");
- DataflowPipelineTranslator.fromOptions(options)
- .translate(
- p,
- pipelineProto,
- sdkComponents,
- DataflowRunner.fromOptions(options),
- Collections.emptyList());
- }
-
- @Test
- public void testExceptionIsThrownWhenSpannerChangeStreamsAndNoAutoscalingSpecified()
- throws IOException {
- final DataflowPipelineOptions options = buildPipelineOptions();
- options.as(SpannerChangeStreamOptions.class).setMetadataTable("MyMetadataTable");
- final Pipeline p = buildPipeline(options);
- final SdkComponents sdkComponents = createSdkComponents(options);
- final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p, sdkComponents, true);
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage(
- "Autoscaling is not supported for SpannerIO.readChangeStreams. Please disable it by specifying the autoscaling algorithm as NONE");
- final JobSpecification jobSpecification =
- DataflowPipelineTranslator.fromOptions(options)
- .translate(
- p,
- pipelineProto,
- sdkComponents,
- DataflowRunner.fromOptions(options),
- Collections.emptyList());
- assertNotNull(jobSpecification);
- }
-
@Test
public void testNumWorkersCannotExceedMaxNumWorkers() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index 95af7ff912a..66a9ad1011c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -82,6 +82,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.PostProcessingMetri
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.MapperFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
@@ -1595,6 +1596,7 @@ public class SpannerIO {
: getInclusiveEndAt();
final MapperFactory mapperFactory = new MapperFactory();
final ChangeStreamMetrics metrics = new ChangeStreamMetrics();
+ final ThroughputEstimator throughputEstimator = new ThroughputEstimator();
final RpcPriority rpcPriority =
MoreObjects.firstNonNull(getRpcPriority(), RpcPriority.HIGH);
final DaoFactory daoFactory =
@@ -1612,7 +1614,8 @@ public class SpannerIO {
final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
new DetectNewPartitionsDoFn(daoFactory, mapperFactory, actionFactory, metrics);
final ReadChangeStreamPartitionDoFn readChangeStreamPartitionDoFn =
- new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics);
+ new ReadChangeStreamPartitionDoFn(
+ daoFactory, mapperFactory, actionFactory, metrics, throughputEstimator);
final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
new PostProcessingMetricsDoFn(metrics);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
index cf778343cb3..cca8506d0e6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
import org.joda.time.Duration;
/**
@@ -108,6 +109,7 @@ public class ActionFactory implements Serializable {
* @param childPartitionsRecordAction action class to process {@link
* org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord}s
* @param metrics metrics gathering class
+ * @param throughputEstimator an estimator to calculate local throughput.
* @return single instance of the {@link QueryChangeStreamAction}
*/
public synchronized QueryChangeStreamAction queryChangeStreamAction(
@@ -118,7 +120,8 @@ public class ActionFactory implements Serializable {
DataChangeRecordAction dataChangeRecordAction,
HeartbeatRecordAction heartbeatRecordAction,
ChildPartitionsRecordAction childPartitionsRecordAction,
- ChangeStreamMetrics metrics) {
+ ChangeStreamMetrics metrics,
+ ThroughputEstimator throughputEstimator) {
if (queryChangeStreamActionInstance == null) {
queryChangeStreamActionInstance =
new QueryChangeStreamAction(
@@ -129,7 +132,8 @@ public class ActionFactory implements Serializable {
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction,
- metrics);
+ metrics,
+ throughputEstimator);
}
return queryChangeStreamActionInstance;
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
index ceaa68b63e9..96791572d00 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -26,6 +26,7 @@ import io.opencensus.common.Scope;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
@@ -39,6 +40,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRec
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
@@ -79,6 +81,7 @@ public class QueryChangeStreamAction {
private final HeartbeatRecordAction heartbeatRecordAction;
private final ChildPartitionsRecordAction childPartitionsRecordAction;
private final ChangeStreamMetrics metrics;
+ private final ThroughputEstimator throughputEstimator;
/**
* Constructs an action class for performing a change stream query for a given partition.
@@ -93,6 +96,7 @@ public class QueryChangeStreamAction {
* @param heartbeatRecordAction action class to process {@link HeartbeatRecord}s
* @param childPartitionsRecordAction action class to process {@link ChildPartitionsRecord}s
* @param metrics metrics gathering class
+ * @param throughputEstimator an estimator to calculate local throughput.
*/
QueryChangeStreamAction(
ChangeStreamDao changeStreamDao,
@@ -102,7 +106,8 @@ public class QueryChangeStreamAction {
DataChangeRecordAction dataChangeRecordAction,
HeartbeatRecordAction heartbeatRecordAction,
ChildPartitionsRecordAction childPartitionsRecordAction,
- ChangeStreamMetrics metrics) {
+ ChangeStreamMetrics metrics,
+ ThroughputEstimator throughputEstimator) {
this.changeStreamDao = changeStreamDao;
this.partitionMetadataDao = partitionMetadataDao;
this.changeStreamRecordMapper = changeStreamRecordMapper;
@@ -111,6 +116,7 @@ public class QueryChangeStreamAction {
this.heartbeatRecordAction = heartbeatRecordAction;
this.childPartitionsRecordAction = childPartitionsRecordAction;
this.metrics = metrics;
+ this.throughputEstimator = throughputEstimator;
}
/**
@@ -212,6 +218,14 @@ public class QueryChangeStreamAction {
LOG.error("[" + token + "] Unknown record type " + record.getClass());
throw new IllegalArgumentException("Unknown record type " + record.getClass());
}
+
+ // The size of a record is represented by the number of bytes needed for the
+ // string representation of the record. Here, we only try to achieve an estimate
+ // instead of an accurate throughput.
+ this.throughputEstimator.update(
+ record.getRecordTimestamp(),
+ record.toString().getBytes(StandardCharsets.UTF_8).length);
+
if (maybeContinuation.isPresent()) {
LOG.debug("[" + token + "] Continuation present, returning " + maybeContinuation);
bundleFinalizer.afterBundleCommit(
@@ -221,7 +235,6 @@ public class QueryChangeStreamAction {
}
}
}
-
bundleFinalizer.afterBundleCommit(
Instant.now().plus(BUNDLE_FINALIZER_TIMEOUT),
updateWatermarkCallback(token, watermarkEstimator));
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
index aa44d397902..6c1ab8feaf9 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
@@ -24,6 +24,7 @@ import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.io.Serializable;
+import java.math.BigDecimal;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ChildPartitionsRecordAction;
@@ -39,6 +40,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadata
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampUtils;
import org.apache.beam.sdk.transforms.DoFn;
@@ -67,11 +69,13 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
private static final long serialVersionUID = -7574596218085711975L;
private static final Logger LOG = LoggerFactory.getLogger(ReadChangeStreamPartitionDoFn.class);
private static final Tracer TRACER = Tracing.getTracer();
+ private static final double AUTOSCALING_SIZE_MULTIPLIER = 2.0D;
private final DaoFactory daoFactory;
private final MapperFactory mapperFactory;
private final ActionFactory actionFactory;
private final ChangeStreamMetrics metrics;
+ private final ThroughputEstimator throughputEstimator;
private transient QueryChangeStreamAction queryChangeStreamAction;
@@ -88,16 +92,19 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
* @param mapperFactory the {@link MapperFactory} to construct {@link ChangeStreamRecordMapper}s
* @param actionFactory the {@link ActionFactory} to construct actions
* @param metrics the {@link ChangeStreamMetrics} to emit partition related metrics
+ * @param throughputEstimator an estimator to calculate local throughput.
*/
public ReadChangeStreamPartitionDoFn(
DaoFactory daoFactory,
MapperFactory mapperFactory,
ActionFactory actionFactory,
- ChangeStreamMetrics metrics) {
+ ChangeStreamMetrics metrics,
+ ThroughputEstimator throughputEstimator) {
this.daoFactory = daoFactory;
this.mapperFactory = mapperFactory;
this.actionFactory = actionFactory;
this.metrics = metrics;
+ this.throughputEstimator = throughputEstimator;
}
@GetInitialWatermarkEstimatorState
@@ -146,6 +153,25 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
return TimestampRange.of(startTimestamp, endTimestamp);
}
+ @GetSize
+ public double getSize(@Element PartitionMetadata partition, @Restriction TimestampRange range)
+ throws Exception {
+ final BigDecimal timeGapInSeconds =
+ BigDecimal.valueOf(newTracker(partition, range).getProgress().getWorkRemaining());
+ final BigDecimal throughput = BigDecimal.valueOf(this.throughputEstimator.get());
+ LOG.debug(
+ "Reported getSize() - remaining work: " + timeGapInSeconds + " throughput:" + throughput);
+ // Cap it at Double.MAX_VALUE to avoid an overflow.
+ return timeGapInSeconds
+ .multiply(throughput)
+ // The multiplier is required because the job tries to reach the minimum number of workers
+ // and this leads to a very high cpu utilization. The multiplier would increase the reported
+ // size and help to reduce the cpu usage. In the future, this can become a custom parameter.
+ .multiply(BigDecimal.valueOf(AUTOSCALING_SIZE_MULTIPLIER))
+ .min(BigDecimal.valueOf(Double.MAX_VALUE))
+ .doubleValue();
+ }
+
@NewTracker
public ReadChangeStreamPartitionRangeTracker newTracker(
@Element PartitionMetadata partition, @Restriction TimestampRange range) {
@@ -180,7 +206,8 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction,
- metrics);
+ metrics,
+ throughputEstimator);
}
/**
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java
new file mode 100644
index 00000000000..513e742dbde
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimator.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import com.google.cloud.Timestamp;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.ImmutablePair;
+
+/** An estimator to provide an estimate on the throughput of the outputted elements. */
+public class ThroughputEstimator implements Serializable {
+
+ private static final long serialVersionUID = -3597929310338724800L;
+ // The start time of each per-second window.
+ private Timestamp startTimeOfCurrentWindow;
+ // The bytes of the current window.
+ private BigDecimal bytesInCurrentWindow;
+ // The number of seconds to look in the past.
+ private final int numOfSeconds = 60;
+ // The total bytes of all windows in the queue.
+ private BigDecimal bytesInQueue;
+ // The queue holds a number of windows in the past in order to calculate
+ // a rolling windowing throughput.
+ private final Queue<ImmutablePair<Timestamp, BigDecimal>> queue;
+
+ public ThroughputEstimator() {
+ queue = new ArrayDeque<>();
+ startTimeOfCurrentWindow = Timestamp.MIN_VALUE;
+ bytesInCurrentWindow = BigDecimal.valueOf(0L);
+ bytesInQueue = BigDecimal.valueOf(0L);
+ }
+
+ /**
+ * Updates the estimator with the bytes of records.
+ *
+ * @param timeOfRecords the committed timestamp of the records
+ * @param bytes the total bytes of the records
+ */
+ public void update(Timestamp timeOfRecords, long bytes) {
+ BigDecimal bytesNum = BigDecimal.valueOf(bytes);
+ if (startTimeOfCurrentWindow.equals(Timestamp.MIN_VALUE)) {
+ bytesInCurrentWindow = bytesNum;
+ startTimeOfCurrentWindow = timeOfRecords;
+ return;
+ }
+
+ if (timeOfRecords.getSeconds() < startTimeOfCurrentWindow.getSeconds() + 1) {
+ bytesInCurrentWindow = bytesInCurrentWindow.add(bytesNum);
+ } else {
+ queue.add(new ImmutablePair<>(startTimeOfCurrentWindow, bytesInCurrentWindow));
+ bytesInQueue = bytesInQueue.add(bytesInCurrentWindow);
+
+ bytesInCurrentWindow = bytesNum;
+ startTimeOfCurrentWindow = timeOfRecords;
+ }
+ cleanQueue(startTimeOfCurrentWindow);
+ }
+
+ /** Returns the estimated throughput for now. */
+ public double get() {
+ return getFrom(Timestamp.now());
+ }
+
+ /**
+ * Returns the estimated throughput for a specified time.
+ *
+ * @param time the specified timestamp to check throughput
+ */
+ public double getFrom(Timestamp time) {
+ cleanQueue(time);
+ if (queue.size() == 0) {
+ return 0D;
+ }
+ return bytesInQueue
+ .divide(BigDecimal.valueOf(queue.size()), MathContext.DECIMAL128)
+ .doubleValue();
+ }
+
+ private void cleanQueue(Timestamp time) {
+ while (queue.size() != 0) {
+ ImmutablePair<Timestamp, BigDecimal> peek = queue.peek();
+ if (peek != null && peek.getLeft().getSeconds() >= time.getSeconds() - numOfSeconds) {
+ break;
+ }
+ // Remove the element if the timestamp of the first element is beyond
+ // the time range to look backward.
+ ImmutablePair<Timestamp, BigDecimal> pair = queue.remove();
+ bytesInQueue = bytesInQueue.subtract(pair.getRight());
+ }
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java
index 8931c66c3c7..d17d691cdc8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTracker.java
@@ -27,10 +27,14 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import com.google.cloud.Timestamp;
import java.math.BigDecimal;
+import java.util.function.Supplier;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A {@link RestrictionTracker} for claiming positions in a {@link TimestampRange} in a
@@ -45,12 +49,20 @@ import org.checkerframework.checker.nullness.qual.Nullable;
public class TimestampRangeTracker extends RestrictionTracker<TimestampRange, Timestamp>
implements HasProgress {
+ private static final Logger LOG = LoggerFactory.getLogger(TimestampRangeTracker.class);
protected TimestampRange range;
protected @Nullable Timestamp lastAttemptedPosition;
protected @Nullable Timestamp lastClaimedPosition;
+ protected Supplier<Timestamp> timeSupplier;
public TimestampRangeTracker(TimestampRange range) {
this.range = checkNotNull(range);
+ this.timeSupplier = () -> Timestamp.now();
+ }
+
+ @VisibleForTesting
+ public void setTimeSupplier(Supplier<Timestamp> timeSupplier) {
+ this.timeSupplier = timeSupplier;
}
/**
@@ -182,43 +194,36 @@ public class TimestampRangeTracker extends RestrictionTracker<TimestampRange, Ti
}
/**
- * Returns the progress made within the restriction so far. This progress is returned in a
- * normalized fashion from the interval [0, 1]. Zero means no work indicates no work (completed or
- * remaining), while 1 indicates all work (completed or remaining).
+ * Returns the progress made within the restriction so far. If lastAttemptedPosition is null, the
+ * start of the range is used as the completed work; otherwise, lastAttemptedPosition will be
+ * used. The time gap between lastAttemptedPosition and now is used as the remaining work. In this
+ * way, when the time gap becomes large, we will have more backlog to process and we should add
+ * more resources.
*
- * <p>If no position was attempted, it will return {@code workCompleted} as 0 and {@code
- * workRemaining} as 1. If a position was attempted, it will return the fraction of work completed
- * and work remaining based on the offset the position represents in the restriction range. If the
- * last position attempted was greater than the end of the restriction range, it will return
- * {@code workCompleted} as 1 and {@code workRemaining} as 0.
- *
- * @return work completed and work remaining as fractions between [0, 1]
+ * @return work completed and work remaining in seconds.
*/
@Override
public Progress getProgress() {
- final BigDecimal fromInNanos = toNanos(range.getFrom());
- final BigDecimal toInNanos = toNanos(range.getTo());
- final BigDecimal totalWork = toInNanos.subtract(fromInNanos, DECIMAL128);
-
- if (lastAttemptedPosition == null) {
- final double workCompleted = 0D;
- final double workRemaining = 1D;
-
- return Progress.from(workCompleted, workRemaining);
+ final BigDecimal now = BigDecimal.valueOf(timeSupplier.get().getSeconds());
+ BigDecimal current;
+ if (lastClaimedPosition == null) {
+ current = BigDecimal.valueOf(range.getFrom().getSeconds());
} else {
- final BigDecimal currentInNanos = toNanos(lastAttemptedPosition);
- final BigDecimal workRemainingInNanos =
- toInNanos.subtract(currentInNanos, DECIMAL128).max(BigDecimal.ZERO);
-
- final double workCompleted =
- totalWork
- .subtract(workRemainingInNanos, DECIMAL128)
- .divide(totalWork, DECIMAL128)
- .doubleValue();
- final double workRemaining = workRemainingInNanos.divide(totalWork, DECIMAL128).doubleValue();
-
- return Progress.from(workCompleted, workRemaining);
+ current = BigDecimal.valueOf(lastClaimedPosition.getSeconds());
}
+ // The remaining work must be greater than 0. Otherwise, it will cause an issue
+ // that the watermark does not advance.
+ final BigDecimal workRemaining = now.subtract(current).max(BigDecimal.ONE);
+
+ LOG.debug(
+ "Reported progress - current:"
+ + current.doubleValue()
+ + " now:"
+ + now.doubleValue()
+ + " workRemaining:"
+ + workRemaining.doubleValue());
+
+ return Progress.from(current.doubleValue(), workRemaining.doubleValue());
}
@Override
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
index 38e00e519ac..4f83aa5e827 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRec
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
@@ -64,6 +65,7 @@ public class QueryChangeStreamActionTest {
private PartitionMetadataDao partitionMetadataDao;
private PartitionMetadata partition;
private ChangeStreamMetrics metrics;
+ private ThroughputEstimator throughputEstimator;
private TimestampRange restriction;
private RestrictionTracker<TimestampRange, Timestamp> restrictionTracker;
private OutputReceiver<DataChangeRecord> outputReceiver;
@@ -86,6 +88,7 @@ public class QueryChangeStreamActionTest {
heartbeatRecordAction = mock(HeartbeatRecordAction.class);
childPartitionsRecordAction = mock(ChildPartitionsRecordAction.class);
metrics = mock(ChangeStreamMetrics.class);
+ throughputEstimator = mock(ThroughputEstimator.class);
action =
new QueryChangeStreamAction(
@@ -96,7 +99,8 @@ public class QueryChangeStreamActionTest {
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction,
- metrics);
+ metrics,
+ throughputEstimator);
final Struct row = mock(Struct.class);
partition =
PartitionMetadata.newBuilder()
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
index 6aa5acb493a..6c791b706f3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.MapperFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ThroughputEstimator;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.TimestampRange;
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
@@ -79,6 +80,7 @@ public class ReadChangeStreamPartitionDoFnTest {
final DaoFactory daoFactory = mock(DaoFactory.class);
final MapperFactory mapperFactory = mock(MapperFactory.class);
final ChangeStreamMetrics metrics = mock(ChangeStreamMetrics.class);
+ final ThroughputEstimator throughputEstimator = mock(ThroughputEstimator.class);
final ActionFactory actionFactory = mock(ActionFactory.class);
final PartitionMetadataDao partitionMetadataDao = mock(PartitionMetadataDao.class);
final ChangeStreamDao changeStreamDao = mock(ChangeStreamDao.class);
@@ -89,7 +91,9 @@ public class ReadChangeStreamPartitionDoFnTest {
childPartitionsRecordAction = mock(ChildPartitionsRecordAction.class);
queryChangeStreamAction = mock(QueryChangeStreamAction.class);
- doFn = new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics);
+ doFn =
+ new ReadChangeStreamPartitionDoFn(
+ daoFactory, mapperFactory, actionFactory, metrics, throughputEstimator);
partition =
PartitionMetadata.newBuilder()
@@ -126,7 +130,8 @@ public class ReadChangeStreamPartitionDoFnTest {
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction,
- metrics))
+ metrics,
+ throughputEstimator))
.thenReturn(queryChangeStreamAction);
doFn.setup();
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java
index 786ca52b3d0..202179bd915 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java
@@ -47,7 +47,6 @@ import org.joda.time.Instant;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -85,7 +84,6 @@ public class SpannerChangeStreamIT {
pipeline.getOptions().as(ChangeStreamTestPipelineOptions.class).setBlockOnRun(false);
}
- @Ignore("BEAM-14277 Sickbay until autoscaling changes are merged")
@Test
public void testReadSpannerChangeStream() {
// Defines how many rows are going to be inserted / updated / deleted in the test
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java
index f1b6fbc3098..eb5b9e3ba15 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamTransactionBoundariesIT.java
@@ -49,7 +49,6 @@ import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Instant;
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -84,7 +83,6 @@ public class SpannerChangeStreamTransactionBoundariesIT {
databaseClient = ENV.getDatabaseClient();
}
- @Ignore("BEAM-14277 Sickbay until autoscaling changes are merged")
@Test
public void testTransactionBoundaries() {
LOG.info("Test pipeline: " + pipeline.toString());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java
new file mode 100644
index 00000000000..1c1282dd446
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ThroughputEstimatorTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.Timestamp;
+import java.math.BigDecimal;
+import java.math.MathContext;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.ImmutablePair;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ThroughputEstimatorTest {
+ private static final double DELTA = 1e-10;
+ private ThroughputEstimator estimator;
+
+ @Before
+ public void setup() {
+ estimator = new ThroughputEstimator();
+ }
+
+ @Test
+ public void testThroughputCalculation() {
+ estimator.update(Timestamp.ofTimeSecondsAndNanos(20, 0), 10);
+ estimator.update(Timestamp.ofTimeSecondsAndNanos(30, 0), 20);
+ estimator.update(Timestamp.ofTimeSecondsAndNanos(59, 0), 30);
+ estimator.update(Timestamp.ofTimeSecondsAndNanos(60, 0), 40); // Exclusive
+ assertEquals(20D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(61, 0)), DELTA);
+
+ estimator.update(Timestamp.ofTimeSecondsAndNanos(100, 0), 10);
+ estimator.update(Timestamp.ofTimeSecondsAndNanos(110, 0), 20);
+ estimator.update(Timestamp.ofTimeSecondsAndNanos(110, 0), 10);
+ estimator.update(Timestamp.ofTimeSecondsAndNanos(140, 0), 40); // Exclusive
+ assertEquals(20D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(141, 0)), DELTA);
+
+ estimator.update(Timestamp.ofTimeSecondsAndNanos(201, 0), 10);
+ estimator.update(Timestamp.ofTimeSecondsAndNanos(250, 0), 40); // Exclusive
+ assertEquals(10D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(261, 0)), DELTA);
+
+ assertEquals(0D, estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(350, 0)), DELTA);
+ }
+
+ @Test
+ public void testThroughputIsAccumulatedWithin60SecondsWindow() {
+ List<ImmutablePair<Timestamp, Long>> pairs = generateTestData(100, 0, 60, Long.MAX_VALUE);
+ pairs.sort((a, b) -> a.getLeft().compareTo(b.getLeft()));
+
+ final long count = pairs.stream().map(ImmutablePair::getLeft).distinct().count();
+ BigDecimal sum = BigDecimal.valueOf(0L);
+ for (ImmutablePair<Timestamp, Long> pair : pairs) {
+ sum = sum.add(BigDecimal.valueOf(pair.getRight()));
+ }
+ final BigDecimal want = sum.divide(BigDecimal.valueOf(count), MathContext.DECIMAL128);
+
+ for (int i = 0; i < pairs.size(); i++) {
+ estimator.update(pairs.get(i).getLeft(), pairs.get(i).getRight());
+ }
+
+ // This is needed to push the current window into the queue.
+ estimator.update(Timestamp.ofTimeSecondsAndNanos(60, 0), 10);
+ double actual = estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(60, 0));
+ assertEquals(want.doubleValue(), actual, DELTA);
+ }
+
+ @Test
+ public void testThroughputIsAccumulatedWithin300SecondsWindow() {
+ List<ImmutablePair<Timestamp, Long>> excludedPairs =
+ generateTestData(300, 0, 240, Long.MAX_VALUE);
+ List<ImmutablePair<Timestamp, Long>> expectedPairs =
+ generateTestData(50, 240, 300, Long.MAX_VALUE);
+ List<ImmutablePair<Timestamp, Long>> pairs =
+ Stream.concat(excludedPairs.stream(), expectedPairs.stream()).collect(Collectors.toList());
+ pairs.sort((a, b) -> a.getLeft().compareTo(b.getLeft()));
+
+ final long count = expectedPairs.stream().map(ImmutablePair::getLeft).distinct().count();
+ BigDecimal sum = BigDecimal.valueOf(0L);
+ for (ImmutablePair<Timestamp, Long> pair : expectedPairs) {
+ sum = sum.add(BigDecimal.valueOf(pair.getRight()));
+ }
+ final BigDecimal want = sum.divide(BigDecimal.valueOf(count), MathContext.DECIMAL128);
+ for (int i = 0; i < pairs.size(); i++) {
+ estimator.update(pairs.get(i).getLeft(), pairs.get(i).getRight());
+ }
+
+ // This is needed to push the current window into the queue.
+ estimator.update(Timestamp.ofTimeSecondsAndNanos(300, 0), 10);
+ double actual = estimator.getFrom(Timestamp.ofTimeSecondsAndNanos(300, 0));
+ assertEquals(want.doubleValue(), actual, DELTA);
+ }
+
+ private List<ImmutablePair<Timestamp, Long>> generateTestData(
+ int size, int startSeconds, int endSeconds, long maxBytes) {
+ Random random = new Random();
+ List<ImmutablePair<Timestamp, Long>> pairs = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ int seconds = random.nextInt(endSeconds - startSeconds) + startSeconds;
+ pairs.add(
+ new ImmutablePair<>(
+ Timestamp.ofTimeSecondsAndNanos(seconds, 0),
+ ThreadLocalRandom.current().nextLong(maxBytes)));
+ }
+ return pairs;
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java
index f8a879fb690..5a016164849 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/TimestampRangeTrackerTest.java
@@ -41,7 +41,7 @@ import org.junit.runner.RunWith;
@RunWith(JUnitQuickcheck.class)
public class TimestampRangeTrackerTest {
- private static final double DELTA = 1e-15;
+ private static final double DELTA = 1e-10;
@Property
public void testTryClaimReturnsTrueWhenPositionIsWithinTheRange(
@@ -221,82 +221,92 @@ public class TimestampRangeTrackerTest {
}
@Property
- public void testGetProgressWorkCompletedAndWorkRemainingEqualsToOne(
+ public void testGetProgressWorkCompletedAndWorkRemaining(
@From(TimestampGenerator.class) Timestamp from,
@From(TimestampGenerator.class) Timestamp to,
@From(TimestampGenerator.class) Timestamp position) {
+ assumeThat(from, greaterThanOrEqualTo(Timestamp.ofTimeSecondsAndNanos(0, 0)));
assumeThat(from, lessThanOrEqualTo(to));
assumeThat(position, greaterThanOrEqualTo(from));
assumeThat(position, lessThan(to));
final TimestampRange range = TimestampRange.of(from, to);
final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
+ tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(position.getSeconds() + 10, 0));
tracker.tryClaim(position);
final Progress progress = tracker.getProgress();
- assertEquals(1D, progress.getWorkCompleted() + progress.getWorkRemaining(), DELTA);
+ assertEquals(position.getSeconds(), progress.getWorkCompleted(), DELTA);
+ assertEquals(10D, progress.getWorkRemaining(), DELTA);
}
@Test
public void testGetProgressReturnsWorkRemainingAsWholeRangeWhenNoClaimWasAttempted() {
- final Timestamp from = Timestamp.MIN_VALUE;
+ final Timestamp from = Timestamp.ofTimeSecondsAndNanos(0, 0);
final Timestamp to = Timestamp.now();
final TimestampRange range = TimestampRange.of(from, to);
final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
+ tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(to.getSeconds(), 0));
+
final Progress progress = tracker.getProgress();
assertEquals(0D, progress.getWorkCompleted(), DELTA);
- assertEquals(1D, progress.getWorkRemaining(), DELTA);
+ assertEquals(to.getSeconds(), progress.getWorkRemaining(), DELTA);
}
@Test
public void testGetProgressReturnsWorkRemainingAsRangeEndMinusAttemptedPosition() {
final Timestamp from = Timestamp.ofTimeSecondsAndNanos(0, 0);
- final Timestamp to = Timestamp.ofTimeSecondsAndNanos(0, 100);
- final Timestamp position = Timestamp.ofTimeSecondsAndNanos(0, 30);
+ final Timestamp to = Timestamp.ofTimeSecondsAndNanos(100, 0);
+ final Timestamp position = Timestamp.ofTimeSecondsAndNanos(30, 0);
final TimestampRange range = TimestampRange.of(from, to);
final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
+ tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(to.getSeconds(), 0));
+
tracker.tryClaim(position);
final Progress progress = tracker.getProgress();
assertTrue(progress.getWorkCompleted() >= 0);
- assertEquals(0.3D, progress.getWorkCompleted(), DELTA);
+ assertEquals(30D, progress.getWorkCompleted(), DELTA);
assertTrue(progress.getWorkRemaining() >= 0);
- assertEquals(0.7D, progress.getWorkRemaining(), DELTA);
+ assertEquals(70D, progress.getWorkRemaining(), DELTA);
}
@Test
public void testGetProgressReturnsWorkCompletedAsOneWhenRangeEndHasBeenAttempted() {
final Timestamp from = Timestamp.ofTimeSecondsAndNanos(0, 0);
- final Timestamp to = Timestamp.ofTimeSecondsAndNanos(0, 100);
+ final Timestamp to = Timestamp.ofTimeSecondsAndNanos(101, 0);
final TimestampRange range = TimestampRange.of(from, to);
final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
- tracker.tryClaim(to);
+ tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(to.getSeconds(), 0));
+ tracker.tryClaim(Timestamp.ofTimeSecondsAndNanos(100, 0));
+ tracker.tryClaim(Timestamp.ofTimeSecondsAndNanos(101, 0));
final Progress progress = tracker.getProgress();
assertTrue(progress.getWorkCompleted() >= 0);
- assertEquals(1D, progress.getWorkCompleted(), DELTA);
+ assertEquals(100D, progress.getWorkCompleted(), DELTA);
assertTrue(progress.getWorkRemaining() >= 0);
- assertEquals(0D, progress.getWorkRemaining(), DELTA);
+ assertEquals(1D, progress.getWorkRemaining(), DELTA);
}
@Test
public void testGetProgressReturnsWorkCompletedAsOneWhenPastRangeEndHasBeenAttempted() {
final Timestamp from = Timestamp.ofTimeSecondsAndNanos(0, 0);
- final Timestamp to = Timestamp.ofTimeSecondsAndNanos(0, 100);
- final Timestamp position = Timestamp.ofTimeSecondsAndNanos(0, 101);
+ final Timestamp to = Timestamp.ofTimeSecondsAndNanos(101, 0);
+ final Timestamp position = Timestamp.ofTimeSecondsAndNanos(101, 0);
final TimestampRange range = TimestampRange.of(from, to);
final TimestampRangeTracker tracker = new TimestampRangeTracker(range);
+ tracker.setTimeSupplier(() -> Timestamp.ofTimeSecondsAndNanos(position.getSeconds(), 0));
tracker.tryClaim(position);
final Progress progress = tracker.getProgress();
assertTrue(progress.getWorkCompleted() >= 0);
- assertEquals(1D, progress.getWorkCompleted(), DELTA);
+ assertEquals(0D, progress.getWorkCompleted(), DELTA);
assertTrue(progress.getWorkRemaining() >= 0);
- assertEquals(0D, progress.getWorkRemaining(), DELTA);
+ assertEquals(101D, progress.getWorkRemaining(), DELTA);
}
}