You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2020/11/12 17:52:15 UTC
[beam] branch master updated: [BEAM-10114] Pub/Sub Lite
getSplitBacklog implementation (#12867)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 06fc5b03 [BEAM-10114] Pub/Sub Lite getSplitBacklog implementation (#12867)
06fc5b03 is described below
commit 06fc5b03f757863597bdac4be22388cb75049e34
Author: palmere-google <68...@users.noreply.github.com>
AuthorDate: Thu Nov 12 12:51:40 2020 -0500
[BEAM-10114] Pub/Sub Lite getSplitBacklog implementation (#12867)
* Pub/Sub Lite getSplitBacklog implementation
Add an implementation of getSplitBacklog to the Pub/Sub Lite IO. This
implementation depends on the pub/sub lite TopicStats api
* Add topic backlog reader settings
* Suppress nullness warnings
---
.../gcp/pubsublite/PubsubLiteUnboundedReader.java | 63 ++++++-
.../gcp/pubsublite/PubsubLiteUnboundedSource.java | 5 +-
.../sdk/io/gcp/pubsublite/SubscriberOptions.java | 85 ++++++----
.../sdk/io/gcp/pubsublite/TopicBacklogReader.java | 49 ++++++
.../io/gcp/pubsublite/TopicBacklogReaderImpl.java | 89 ++++++++++
.../gcp/pubsublite/TopicBacklogReaderSettings.java | 89 ++++++++++
.../pubsublite/PubsubLiteUnboundedReaderTest.java | 101 +++++++++++-
.../gcp/pubsublite/TopicBacklogReaderImplTest.java | 183 +++++++++++++++++++++
8 files changed, 630 insertions(+), 34 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReader.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReader.java
index aeb365e..62a5448 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReader.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReader.java
@@ -26,6 +26,7 @@ import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.PullSubscriber;
import com.google.cloud.pubsublite.internal.wire.Committer;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.protobuf.Timestamp;
@@ -41,14 +42,22 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** A reader for Pub/Sub Lite that generates a stream of SequencedMessages. */
@SuppressWarnings({
@@ -56,7 +65,10 @@ import org.joda.time.Instant;
})
class PubsubLiteUnboundedReader extends UnboundedReader<SequencedMessage>
implements OffsetFinalizer {
+ private static final Logger LOG = LoggerFactory.getLogger(PubsubLiteUnboundedReader.class);
private final UnboundedSource<SequencedMessage, ?> source;
+ private final TopicBacklogReader backlogReader;
+ private final LoadingCache<String, Long> backlogCache;
private final CloseableMonitor monitor = new CloseableMonitor();
@GuardedBy("monitor.monitor")
@@ -93,7 +105,18 @@ class PubsubLiteUnboundedReader extends UnboundedReader<SequencedMessage>
}
public PubsubLiteUnboundedReader(
- UnboundedSource<SequencedMessage, ?> source, Map<Partition, SubscriberState> subscriberMap)
+ UnboundedSource<SequencedMessage, ?> source,
+ Map<Partition, SubscriberState> subscriberMap,
+ TopicBacklogReader backlogReader)
+ throws StatusException {
+ this(source, subscriberMap, backlogReader, Ticker.systemTicker());
+ }
+
+ PubsubLiteUnboundedReader(
+ UnboundedSource<SequencedMessage, ?> source,
+ Map<Partition, SubscriberState> subscriberMap,
+ TopicBacklogReader backlogReader,
+ Ticker ticker)
throws StatusException {
this.source = source;
this.subscriberMap = ImmutableMap.copyOf(subscriberMap);
@@ -105,9 +128,33 @@ class PubsubLiteUnboundedReader extends UnboundedReader<SequencedMessage>
permanentError = Optional.of(permanentError.orElse(error));
}
});
+ this.backlogReader = backlogReader;
+ this.backlogCache =
+ CacheBuilder.newBuilder()
+ .ticker(ticker)
+ .maximumSize(1)
+ .expireAfterWrite(1, TimeUnit.MINUTES)
+ .refreshAfterWrite(10, TimeUnit.SECONDS)
+ .build(
+ new CacheLoader<Object, Long>() {
+ @Override
+ public Long load(Object val) throws InterruptedException, ExecutionException {
+ return computeSplitBacklog().get().getMessageBytes();
+ }
+ });
this.committerProxy.startAsync().awaitRunning();
}
+ private ApiFuture<ComputeMessageStatsResponse> computeSplitBacklog() {
+ ImmutableMap.Builder<Partition, Offset> builder = ImmutableMap.builder();
+ try (CloseableMonitor.Hold h = monitor.enter()) {
+ subscriberMap.forEach(
+ (partition, subscriberState) ->
+ subscriberState.lastDelivered.ifPresent(offset -> builder.put(partition, offset)));
+ }
+ return backlogReader.computeMessageStats(builder.build());
+ }
+
@Override
public void finalizeOffsets(Map<Partition, Offset> offsets) throws StatusException {
List<ApiFuture<Void>> commitFutures = new ArrayList<>();
@@ -266,6 +313,20 @@ class PubsubLiteUnboundedReader extends UnboundedReader<SequencedMessage>
}
@Override
+ public long getSplitBacklogBytes() {
+ try {
+ // We use the cache because it allows us to coalesce request, periodically refresh the value
+ // and expire the value after a maximum staleness, but there is only ever one key.
+ return backlogCache.get("Backlog");
+ } catch (ExecutionException e) {
+ LOG.warn(
+ "Failed to retrieve backlog information, reporting the backlog size as UNKNOWN: {}",
+ e.getCause().getMessage());
+ return BACKLOG_UNKNOWN;
+ }
+ }
+
+ @Override
public UnboundedSource<SequencedMessage, ?> getCurrentSource() {
return source;
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedSource.java
index 43bdaf4..d4496cc 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedSource.java
@@ -109,7 +109,10 @@ class PubsubLiteUnboundedSource extends UnboundedSource<SequencedMessage, Offset
}
statesBuilder.put(partition, state);
}
- return new PubsubLiteUnboundedReader(this, statesBuilder.build());
+ return new PubsubLiteUnboundedReader(
+ this,
+ statesBuilder.build(),
+ TopicBacklogReader.create(subscriberOptions.topicBacklogReaderSettings()));
} catch (StatusException e) {
throw new IOException(e);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java
index 80bcc01..74d1036 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java
@@ -33,13 +33,12 @@ import com.google.cloud.pubsublite.proto.SubscriberServiceGrpc.SubscriberService
import io.grpc.StatusException;
import java.io.Serializable;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;
-/** Options needed for a Pub/Sub Lite Subscriber. */
@AutoValue
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
@@ -50,19 +49,22 @@ public abstract class SubscriberOptions implements Serializable {
private static final Framework FRAMEWORK = Framework.of("BEAM");
// Required parameters.
- abstract SubscriptionPath subscriptionPath();
+ public abstract SubscriptionPath subscriptionPath();
- abstract FlowControlSettings flowControlSettings();
+ public abstract FlowControlSettings flowControlSettings();
// Optional parameters.
/** A set of partitions. If empty, retrieve the set of partitions using an admin client. */
- abstract Set<Partition> partitions();
+ public abstract Set<Partition> partitions();
+
+ /** The class used to read backlog for the subscription described by subscriptionPath(). */
+ public abstract TopicBacklogReaderSettings topicBacklogReaderSettings();
/** A supplier for the subscriber stub to be used. */
- abstract @Nullable SerializableSupplier<SubscriberServiceStub> subscriberStubSupplier();
+ public abstract @Nullable SerializableSupplier<SubscriberServiceStub> subscriberStubSupplier();
/** A supplier for the cursor service stub to be used. */
- abstract @Nullable SerializableSupplier<CursorServiceStub> committerStubSupplier();
+ public abstract @Nullable SerializableSupplier<CursorServiceStub> committerStubSupplier();
/**
* A factory to override subscriber creation entirely and delegate to another method. Primarily
@@ -87,21 +89,22 @@ public abstract class SubscriberOptions implements Serializable {
Map<Partition, SubscriberFactory> getSubscriberFactories() {
ImmutableMap.Builder<Partition, SubscriberFactory> factories = ImmutableMap.builder();
for (Partition partition : partitions()) {
- factories.put(
- partition,
- Optional.fromNullable(subscriberFactory())
- .or(
- consumer -> {
- SubscriberBuilder.Builder builder = SubscriberBuilder.newBuilder();
- builder.setMessageConsumer(consumer);
- builder.setSubscriptionPath(subscriptionPath());
- builder.setPartition(partition);
- builder.setContext(PubsubContext.of(FRAMEWORK));
- if (subscriberStubSupplier() != null) {
- builder.setSubscriberServiceStub(subscriberStubSupplier().get());
- }
- return builder.build();
- }));
+ SubscriberFactory factory = subscriberFactory();
+ if (factory == null) {
+ factory =
+ consumer -> {
+ SubscriberBuilder.Builder builder = SubscriberBuilder.newBuilder();
+ builder.setMessageConsumer(consumer);
+ builder.setSubscriptionPath(subscriptionPath());
+ builder.setPartition(partition);
+ builder.setContext(PubsubContext.of(FRAMEWORK));
+ if (subscriberStubSupplier() != null) {
+ builder.setSubscriberServiceStub(subscriberStubSupplier().get());
+ }
+ return builder.build();
+ };
+ }
+ factories.put(partition, factory);
}
return factories.build();
}
@@ -141,24 +144,44 @@ public abstract class SubscriberOptions implements Serializable {
public abstract Builder setCommitterStubSupplier(
SerializableSupplier<CursorServiceStub> stubSupplier);
+ public abstract Builder setTopicBacklogReaderSettings(
+ TopicBacklogReaderSettings topicBacklogReaderSettings);
+
+ // Used in unit tests
abstract Builder setSubscriberFactory(SubscriberFactory subscriberFactory);
abstract Builder setCommitterSupplier(SerializableSupplier<Committer> committerSupplier);
+ // Used for implementing build();
+ abstract SubscriptionPath subscriptionPath();
+
+ abstract Set<Partition> partitions();
+
+ abstract Optional<TopicBacklogReaderSettings> topicBacklogReaderSettings();
+
abstract SubscriberOptions autoBuild();
+ @SuppressWarnings("CheckReturnValue")
public SubscriberOptions build() throws StatusException {
- SubscriberOptions built = autoBuild();
- if (!built.partitions().isEmpty()) {
- return built;
+ if (!partitions().isEmpty() && topicBacklogReaderSettings().isPresent()) {
+ return autoBuild();
+ }
+
+ if (partitions().isEmpty()) {
+ int partitionCount = PartitionLookupUtils.numPartitions(subscriptionPath());
+ ImmutableSet.Builder<Partition> partitions = ImmutableSet.builder();
+ for (int i = 0; i < partitionCount; i++) {
+ partitions.add(Partition.of(i));
+ }
+ setPartitions(partitions.build());
}
- int partitionCount = PartitionLookupUtils.numPartitions(built.subscriptionPath());
- SubscriberOptions.Builder builder = built.toBuilder();
- ImmutableSet.Builder<Partition> partitions = ImmutableSet.builder();
- for (int i = 0; i < partitionCount; i++) {
- partitions.add(Partition.of(i));
+ if (!topicBacklogReaderSettings().isPresent()) {
+ setTopicBacklogReaderSettings(
+ TopicBacklogReaderSettings.newBuilder()
+ .setTopicPathFromSubscriptionPath(subscriptionPath())
+ .build());
}
- return builder.setPartitions(partitions.build()).autoBuild();
+ return autoBuild();
}
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReader.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReader.java
new file mode 100644
index 0000000..37dd288
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.api.core.ApiFuture;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import io.grpc.StatusException;
+import java.util.Map;
+
+/**
+ * The TopicBacklogReader is intended for clients who would like to use the TopicStats API to
+ * aggregate the backlog, or the distance between the current cursor and HEAD across multiple
+ * partitions within a subscription.
+ */
+public interface TopicBacklogReader {
+
+ /** Create a TopicBacklogReader from settings. */
+ static TopicBacklogReader create(TopicBacklogReaderSettings settings) throws StatusException {
+ return settings.instantiate();
+ }
+ /**
+ * Compute and aggregate message statistics for message between the provided start offset and HEAD
+ * for each partition.
+ *
+ * @param subscriptionState A map from partition to the current offset of the subscriber in a
+ * given partition.
+ * @return a future with either an error or a ComputeMessageStatsResponse with the aggregated
+ * statistics for messages in the backlog on success.
+ */
+ ApiFuture<ComputeMessageStatsResponse> computeMessageStats(
+ Map<Partition, Offset> subscriptionState);
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderImpl.java
new file mode 100644
index 0000000..58e8f74
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderImpl.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.internal.TopicStatsClient;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import com.google.protobuf.Timestamp;
+import com.google.protobuf.util.Timestamps;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+
+final class TopicBacklogReaderImpl implements TopicBacklogReader {
+ private final TopicStatsClient client;
+ private final TopicPath topicPath;
+
+ public TopicBacklogReaderImpl(TopicStatsClient client, TopicPath topicPath) {
+ this.client = client;
+ this.topicPath = topicPath;
+ }
+
+ private static Optional<Timestamp> minTimestamp(Optional<Timestamp> t1, Timestamp t2) {
+ if (!t1.isPresent() || Timestamps.compare(t1.get(), t2) > 0) {
+ return Optional.of(t2);
+ }
+ return t1;
+ }
+
+ @Override
+ @SuppressWarnings("dereference.of.nullable")
+ public ApiFuture<ComputeMessageStatsResponse> computeMessageStats(
+ Map<Partition, Offset> subscriptionState) {
+ List<ApiFuture<ComputeMessageStatsResponse>> perPartitionFutures =
+ subscriptionState.entrySet().stream()
+ .map(
+ e ->
+ client.computeMessageStats(
+ topicPath, e.getKey(), e.getValue(), Offset.of(Integer.MAX_VALUE)))
+ .collect(Collectors.toList());
+ return ApiFutures.transform(
+ ApiFutures.allAsList(perPartitionFutures),
+ responses -> {
+ Optional<Timestamp> minPublishTime = Optional.empty();
+ Optional<Timestamp> minEventTime = Optional.empty();
+ long messageBytes = 0;
+ long messageCount = 0;
+ for (ComputeMessageStatsResponse response : responses) {
+ messageBytes += response.getMessageBytes();
+ messageCount += response.getMessageCount();
+ if (response.hasMinimumPublishTime()) {
+ minPublishTime = minTimestamp(minPublishTime, response.getMinimumPublishTime());
+ }
+ if (response.hasMinimumEventTime()) {
+ minEventTime = minTimestamp(minPublishTime, response.getMinimumEventTime());
+ }
+ }
+ ComputeMessageStatsResponse.Builder builder =
+ ComputeMessageStatsResponse.newBuilder()
+ .setMessageBytes(messageBytes)
+ .setMessageCount(messageCount);
+ minPublishTime.ifPresent(builder::setMinimumPublishTime);
+ minEventTime.ifPresent(builder::setMinimumEventTime);
+ return builder.build();
+ },
+ MoreExecutors.directExecutor());
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java
new file mode 100644
index 0000000..07120fe
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.pubsublite.AdminClient;
+import com.google.cloud.pubsublite.AdminClientSettings;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.internal.ExtractStatus;
+import com.google.cloud.pubsublite.internal.TopicStatsClient;
+import com.google.cloud.pubsublite.internal.TopicStatsClientSettings;
+import com.google.cloud.pubsublite.proto.TopicStatsServiceGrpc.TopicStatsServiceBlockingStub;
+import io.grpc.StatusException;
+import java.io.Serializable;
+import java.util.concurrent.ExecutionException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@AutoValue
+public abstract class TopicBacklogReaderSettings implements Serializable {
+ private static final long serialVersionUID = -4001752066450248673L;
+
+ /**
+ * The topic path for this backlog reader. Either topicPath or subscriptionPath must be set. If
+ * both are set, subscriptionPath will be ignored.
+ */
+ abstract TopicPath topicPath();
+
+ // Optional parameters
+ abstract @Nullable SerializableSupplier<TopicStatsServiceBlockingStub> stub();
+
+ public static Builder newBuilder() {
+ return new AutoValue_TopicBacklogReaderSettings.Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ // Required parameters.
+ public abstract Builder setTopicPath(TopicPath topicPath);
+
+ @SuppressWarnings("argument.type.incompatible")
+ public Builder setTopicPathFromSubscriptionPath(SubscriptionPath subscriptionPath)
+ throws StatusException {
+ try (AdminClient adminClient =
+ AdminClient.create(
+ AdminClientSettings.newBuilder()
+ .setRegion(subscriptionPath.location().region())
+ .build())) {
+ return setTopicPath(
+ TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic()));
+ } catch (ExecutionException e) {
+ throw ExtractStatus.toCanonical(e.getCause());
+ } catch (Throwable t) {
+ throw ExtractStatus.toCanonical(t);
+ }
+ }
+
+ public abstract Builder setStub(SerializableSupplier<TopicStatsServiceBlockingStub> stub);
+
+ public abstract TopicBacklogReaderSettings build();
+ }
+
+ @SuppressWarnings("CheckReturnValue")
+ TopicBacklogReader instantiate() throws StatusException {
+ TopicStatsClientSettings.Builder builder = TopicStatsClientSettings.newBuilder();
+ SerializableSupplier<TopicStatsServiceBlockingStub> stub = stub();
+ if (stub != null) {
+ builder.setStub(stub.get());
+ }
+ builder.setRegion(topicPath().location().region());
+ return new TopicBacklogReaderImpl(TopicStatsClient.create(builder.build()), topicPath());
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReaderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReaderTest.java
index a40ba9f..6c652c4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReaderTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReaderTest.java
@@ -35,10 +35,14 @@ import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.FakeApiService;
import com.google.cloud.pubsublite.internal.PullSubscriber;
import com.google.cloud.pubsublite.internal.wire.Committer;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SequencedMessage;
+import com.google.protobuf.Duration;
import com.google.protobuf.Timestamp;
+import com.google.protobuf.util.Durations;
import com.google.protobuf.util.Timestamps;
+import io.grpc.Status;
import io.grpc.StatusException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -49,6 +53,7 @@ import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteUnboundedReader.SubscriberState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.Instant;
@@ -70,12 +75,33 @@ public class PubsubLiteUnboundedReaderTest {
abstract static class CommitterFakeService extends FakeApiService implements Committer {}
+ private static class FakeTicker extends Ticker {
+ private Timestamp time;
+
+ FakeTicker(Timestamp start) {
+ time = start;
+ }
+
+ @Override
+ public long read() {
+ return Timestamps.toNanos(time);
+ }
+
+ public void advance(Duration duration) {
+ time = Timestamps.add(time, duration);
+ }
+ }
+
@Spy private CommitterFakeService committer5;
@Spy private CommitterFakeService committer8;
@SuppressWarnings("unchecked")
private final UnboundedSource<SequencedMessage, ?> source = mock(UnboundedSource.class);
+ @Mock private TopicBacklogReader backlogReader;
+
+ private final FakeTicker ticker = new FakeTicker(Timestamps.fromSeconds(450));
+
private final PubsubLiteUnboundedReader reader;
private static SequencedMessage exampleMessage(Offset offset, Timestamp publishTime) {
@@ -104,7 +130,10 @@ public class PubsubLiteUnboundedReaderTest {
state8.committer = committer8;
reader =
new PubsubLiteUnboundedReader(
- source, ImmutableMap.of(Partition.of(5), state5, Partition.of(8), state8));
+ source,
+ ImmutableMap.of(Partition.of(5), state5, Partition.of(8), state8),
+ backlogReader,
+ ticker);
}
@Test
@@ -228,4 +257,74 @@ public class PubsubLiteUnboundedReaderTest {
mark.finalizeCheckpoint();
verify(committer5).commitOffset(Offset.of(10));
}
+
+ @Test
+ public void splitBacklogBytes_returnsUnknownBacklogOnError() throws Exception {
+ when(backlogReader.computeMessageStats(ImmutableMap.of()))
+ .thenReturn(ApiFutures.immediateFailedFuture(new StatusException(Status.UNAVAILABLE)));
+ assertThat(PubsubLiteUnboundedReader.BACKLOG_UNKNOWN, equalTo(reader.getSplitBacklogBytes()));
+ }
+
+ @Test
+ public void splitBacklogBytes_computesBacklog() throws Exception {
+ ComputeMessageStatsResponse response =
+ ComputeMessageStatsResponse.newBuilder().setMessageBytes(40).build();
+ when(backlogReader.computeMessageStats(ImmutableMap.of()))
+ .thenReturn(ApiFutures.immediateFuture(response));
+ assertThat(response.getMessageBytes(), equalTo(reader.getSplitBacklogBytes()));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void splitBacklogBytes_computesBacklogOncePerTenSeconds() throws Exception {
+ ComputeMessageStatsResponse response1 =
+ ComputeMessageStatsResponse.newBuilder().setMessageBytes(40).build();
+ ComputeMessageStatsResponse response2 =
+ ComputeMessageStatsResponse.newBuilder().setMessageBytes(50).build();
+
+ when(backlogReader.computeMessageStats(ImmutableMap.of()))
+ .thenReturn(ApiFutures.immediateFuture(response1), ApiFutures.immediateFuture(response2));
+
+ assertThat(response1.getMessageBytes(), equalTo(reader.getSplitBacklogBytes()));
+ ticker.advance(Durations.fromSeconds(10));
+ assertThat(response1.getMessageBytes(), equalTo(reader.getSplitBacklogBytes()));
+ ticker.advance(Durations.fromSeconds(1));
+ assertThat(response2.getMessageBytes(), equalTo(reader.getSplitBacklogBytes()));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void splitBacklogBytes_oldValueExpiresAfterOneMinute() throws Exception {
+ ComputeMessageStatsResponse response =
+ ComputeMessageStatsResponse.newBuilder().setMessageBytes(40).build();
+
+ when(backlogReader.computeMessageStats(ImmutableMap.of()))
+ .thenReturn(
+ ApiFutures.immediateFuture(response),
+ ApiFutures.immediateFailedFuture(new StatusException(Status.UNAVAILABLE)));
+
+ assertThat(response.getMessageBytes(), equalTo(reader.getSplitBacklogBytes()));
+ ticker.advance(Durations.fromSeconds(30));
+ assertThat(response.getMessageBytes(), equalTo(reader.getSplitBacklogBytes()));
+ ticker.advance(Durations.fromSeconds(31));
+ assertThat(PubsubLiteUnboundedReader.BACKLOG_UNKNOWN, equalTo(reader.getSplitBacklogBytes()));
+ }
+
+ @Test
+ public void splitBacklogBytes_usesCorrectCursorValues() throws Exception {
+ SequencedMessage message1 = exampleMessage(Offset.of(10), randomMilliAllignedTimestamp());
+ SequencedMessage message2 = exampleMessage(Offset.of(888), randomMilliAllignedTimestamp());
+ ComputeMessageStatsResponse response =
+ ComputeMessageStatsResponse.newBuilder().setMessageBytes(40).build();
+
+ when(subscriber5.pull()).thenReturn(ImmutableList.of(message1));
+ when(subscriber8.pull()).thenReturn(ImmutableList.of(message2));
+ when(backlogReader.computeMessageStats(
+ ImmutableMap.of(Partition.of(5), Offset.of(10), Partition.of(8), Offset.of(888))))
+ .thenReturn(ApiFutures.immediateFuture(response));
+
+ assertTrue(reader.start());
+ assertTrue(reader.advance());
+ assertThat(response.getMessageBytes(), equalTo(reader.getSplitBacklogBytes()));
+ }
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderImplTest.java
new file mode 100644
index 0000000..090f723
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderImplTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.when;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsublite.CloudZone;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.ProjectNumber;
+import com.google.cloud.pubsublite.TopicName;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.internal.ExtractStatus;
+import com.google.cloud.pubsublite.internal.TopicStatsClient;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import com.google.protobuf.Timestamp;
+import io.grpc.Status;
+import io.grpc.Status.Code;
+import io.grpc.StatusException;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+@SuppressWarnings("uninitialized")
+@RunWith(JUnit4.class)
+public final class TopicBacklogReaderImplTest {
+
+ @Rule public final MockitoRule mockito = MockitoJUnit.rule();
+
+ @Mock TopicStatsClient mockClient;
+
+ private TopicPath topicPath;
+ private TopicBacklogReader reader;
+
+ @Before
+ public void setUp() throws Exception {
+ this.topicPath =
+ TopicPath.newBuilder()
+ .setProject(ProjectNumber.of(4))
+ .setName(TopicName.of("test"))
+ .setLocation(CloudZone.parse("us-central1-b"))
+ .build();
+ this.reader = new TopicBacklogReaderImpl(mockClient, topicPath);
+ }
+
+ @SuppressWarnings("incompatible")
+ @Test
+ public void computeMessageStats_partialFailure() throws Exception {
+ ComputeMessageStatsResponse partition1 = ComputeMessageStatsResponse.getDefaultInstance();
+
+ when(mockClient.computeMessageStats(
+ topicPath, Partition.of(1), Offset.of(10), Offset.of(Integer.MAX_VALUE)))
+ .thenReturn(ApiFutures.immediateFuture(partition1));
+ when(mockClient.computeMessageStats(
+ topicPath, Partition.of(2), Offset.of(20), Offset.of(Integer.MAX_VALUE)))
+ .thenReturn(ApiFutures.immediateFailedFuture(new StatusException(Status.UNAVAILABLE)));
+
+ ImmutableMap.Builder<Partition, Offset> builder = ImmutableMap.builder();
+ ApiFuture<ComputeMessageStatsResponse> future =
+ reader.computeMessageStats(
+ ImmutableMap.of(Partition.of(1), Offset.of(10), Partition.of(2), Offset.of(20)));
+
+ ExecutionException ex = assertThrows(ExecutionException.class, future::get);
+ assertEquals(Code.UNAVAILABLE, ExtractStatus.extract(ex.getCause()).get().getCode());
+ }
+
+ @Test
+ public void computeMessageStats_aggregatesEmptyMessages() throws Exception {
+ ComputeMessageStatsResponse partition1 = ComputeMessageStatsResponse.getDefaultInstance();
+ ComputeMessageStatsResponse partition2 = ComputeMessageStatsResponse.getDefaultInstance();
+ ComputeMessageStatsResponse aggregate = ComputeMessageStatsResponse.getDefaultInstance();
+
+ when(mockClient.computeMessageStats(
+ topicPath, Partition.of(1), Offset.of(10), Offset.of(Integer.MAX_VALUE)))
+ .thenReturn(ApiFutures.immediateFuture(partition1));
+ when(mockClient.computeMessageStats(
+ topicPath, Partition.of(2), Offset.of(20), Offset.of(Integer.MAX_VALUE)))
+ .thenReturn(ApiFutures.immediateFuture(partition2));
+
+ ImmutableMap.Builder<Partition, Offset> builder = ImmutableMap.builder();
+ ApiFuture<ComputeMessageStatsResponse> future =
+ reader.computeMessageStats(
+ ImmutableMap.of(Partition.of(1), Offset.of(10), Partition.of(2), Offset.of(20)));
+
+ assertEquals(future.get(), aggregate);
+ }
+
+ @Test
+ public void computeMessageStats_timestampsAggregatedWhenPresent() throws Exception {
+ Timestamp minEventTime = Timestamp.newBuilder().setSeconds(1000).setNanos(10).build();
+ Timestamp minPublishTime = Timestamp.newBuilder().setSeconds(1001).setNanos(11).build();
+ ComputeMessageStatsResponse partition1 =
+ ComputeMessageStatsResponse.newBuilder().setMinimumPublishTime(minPublishTime).build();
+ ComputeMessageStatsResponse partition2 =
+ ComputeMessageStatsResponse.newBuilder().setMinimumEventTime(minEventTime).build();
+ ComputeMessageStatsResponse aggregate =
+ ComputeMessageStatsResponse.newBuilder()
+ .setMinimumEventTime(minEventTime)
+ .setMinimumPublishTime(minPublishTime)
+ .build();
+
+ when(mockClient.computeMessageStats(
+ topicPath, Partition.of(1), Offset.of(10), Offset.of(Integer.MAX_VALUE)))
+ .thenReturn(ApiFutures.immediateFuture(partition1));
+ when(mockClient.computeMessageStats(
+ topicPath, Partition.of(2), Offset.of(20), Offset.of(Integer.MAX_VALUE)))
+ .thenReturn(ApiFutures.immediateFuture(partition2));
+
+ ImmutableMap.Builder<Partition, Offset> builder = ImmutableMap.builder();
+ ApiFuture<ComputeMessageStatsResponse> future =
+ reader.computeMessageStats(
+ ImmutableMap.of(Partition.of(1), Offset.of(10), Partition.of(2), Offset.of(20)));
+
+ assertEquals(future.get(), aggregate);
+ }
+
+ @Test
+ public void computeMessageStats_resultsAggregated() throws Exception {
+ Timestamp minEventTime = Timestamp.newBuilder().setSeconds(1000).setNanos(10).build();
+ Timestamp minPublishTime = Timestamp.newBuilder().setSeconds(1001).setNanos(11).build();
+ ComputeMessageStatsResponse partition1 =
+ ComputeMessageStatsResponse.newBuilder()
+ .setMessageCount(10)
+ .setMessageBytes(100)
+ .setMinimumEventTime(minEventTime.toBuilder().setSeconds(1002).build())
+ .setMinimumPublishTime(minPublishTime)
+ .build();
+ ComputeMessageStatsResponse partition2 =
+ ComputeMessageStatsResponse.newBuilder()
+ .setMessageCount(20)
+ .setMessageBytes(200)
+ .setMinimumEventTime(minEventTime)
+ .setMinimumPublishTime(minPublishTime.toBuilder().setNanos(12).build())
+ .build();
+ ComputeMessageStatsResponse aggregate =
+ ComputeMessageStatsResponse.newBuilder()
+ .setMessageCount(30)
+ .setMessageBytes(300)
+ .setMinimumEventTime(minEventTime)
+ .setMinimumPublishTime(minPublishTime)
+ .build();
+
+ when(mockClient.computeMessageStats(
+ topicPath, Partition.of(1), Offset.of(10), Offset.of(Integer.MAX_VALUE)))
+ .thenReturn(ApiFutures.immediateFuture(partition1));
+ when(mockClient.computeMessageStats(
+ topicPath, Partition.of(2), Offset.of(20), Offset.of(Integer.MAX_VALUE)))
+ .thenReturn(ApiFutures.immediateFuture(partition2));
+
+ ImmutableMap.Builder<Partition, Offset> builder = ImmutableMap.builder();
+ ApiFuture<ComputeMessageStatsResponse> future =
+ reader.computeMessageStats(
+ ImmutableMap.of(Partition.of(1), Offset.of(10), Partition.of(2), Offset.of(20)));
+
+ assertEquals(future.get(), aggregate);
+ }
+}