You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2020/11/12 17:52:15 UTC

[beam] branch master updated: [BEAM-10114] Pub/Sub Lite getSplitBacklog implementation (#12867)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 06fc5b03 [BEAM-10114] Pub/Sub Lite getSplitBacklog implementation (#12867)
06fc5b03 is described below

commit 06fc5b03f757863597bdac4be22388cb75049e34
Author: palmere-google <68...@users.noreply.github.com>
AuthorDate: Thu Nov 12 12:51:40 2020 -0500

    [BEAM-10114] Pub/Sub Lite getSplitBacklog implementation (#12867)
    
    * Pub/Sub Lite getSplitBacklog implementation
    
    Add an implementation of getSplitBacklog to the Pub/Sub Lite IO. This
    implementation depends on the pub/sub lite TopicStats api
    
    * Add topic backlog reader settings
    
    * Suppress nullness warnings
---
 .../gcp/pubsublite/PubsubLiteUnboundedReader.java  |  63 ++++++-
 .../gcp/pubsublite/PubsubLiteUnboundedSource.java  |   5 +-
 .../sdk/io/gcp/pubsublite/SubscriberOptions.java   |  85 ++++++----
 .../sdk/io/gcp/pubsublite/TopicBacklogReader.java  |  49 ++++++
 .../io/gcp/pubsublite/TopicBacklogReaderImpl.java  |  89 ++++++++++
 .../gcp/pubsublite/TopicBacklogReaderSettings.java |  89 ++++++++++
 .../pubsublite/PubsubLiteUnboundedReaderTest.java  | 101 +++++++++++-
 .../gcp/pubsublite/TopicBacklogReaderImplTest.java | 183 +++++++++++++++++++++
 8 files changed, 630 insertions(+), 34 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReader.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReader.java
index aeb365e..62a5448 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReader.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReader.java
@@ -26,6 +26,7 @@ import com.google.cloud.pubsublite.internal.ExtractStatus;
 import com.google.cloud.pubsublite.internal.ProxyService;
 import com.google.cloud.pubsublite.internal.PullSubscriber;
 import com.google.cloud.pubsublite.internal.wire.Committer;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import com.google.protobuf.Timestamp;
@@ -41,14 +42,22 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** A reader for Pub/Sub Lite that generates a stream of SequencedMessages. */
 @SuppressWarnings({
@@ -56,7 +65,10 @@ import org.joda.time.Instant;
 })
 class PubsubLiteUnboundedReader extends UnboundedReader<SequencedMessage>
     implements OffsetFinalizer {
+  private static final Logger LOG = LoggerFactory.getLogger(PubsubLiteUnboundedReader.class);
   private final UnboundedSource<SequencedMessage, ?> source;
+  private final TopicBacklogReader backlogReader;
+  private final LoadingCache<String, Long> backlogCache;
   private final CloseableMonitor monitor = new CloseableMonitor();
 
   @GuardedBy("monitor.monitor")
@@ -93,7 +105,18 @@ class PubsubLiteUnboundedReader extends UnboundedReader<SequencedMessage>
   }
 
   public PubsubLiteUnboundedReader(
-      UnboundedSource<SequencedMessage, ?> source, Map<Partition, SubscriberState> subscriberMap)
+      UnboundedSource<SequencedMessage, ?> source,
+      Map<Partition, SubscriberState> subscriberMap,
+      TopicBacklogReader backlogReader)
+      throws StatusException {
+    this(source, subscriberMap, backlogReader, Ticker.systemTicker());
+  }
+
+  PubsubLiteUnboundedReader(
+      UnboundedSource<SequencedMessage, ?> source,
+      Map<Partition, SubscriberState> subscriberMap,
+      TopicBacklogReader backlogReader,
+      Ticker ticker)
       throws StatusException {
     this.source = source;
     this.subscriberMap = ImmutableMap.copyOf(subscriberMap);
@@ -105,9 +128,33 @@ class PubsubLiteUnboundedReader extends UnboundedReader<SequencedMessage>
                 permanentError = Optional.of(permanentError.orElse(error));
               }
             });
+    this.backlogReader = backlogReader;
+    this.backlogCache =
+        CacheBuilder.newBuilder()
+            .ticker(ticker)
+            .maximumSize(1)
+            .expireAfterWrite(1, TimeUnit.MINUTES)
+            .refreshAfterWrite(10, TimeUnit.SECONDS)
+            .build(
+                new CacheLoader<Object, Long>() {
+                  @Override
+                  public Long load(Object val) throws InterruptedException, ExecutionException {
+                    return computeSplitBacklog().get().getMessageBytes();
+                  }
+                });
     this.committerProxy.startAsync().awaitRunning();
   }
 
+  private ApiFuture<ComputeMessageStatsResponse> computeSplitBacklog() {
+    ImmutableMap.Builder<Partition, Offset> builder = ImmutableMap.builder();
+    try (CloseableMonitor.Hold h = monitor.enter()) {
+      subscriberMap.forEach(
+          (partition, subscriberState) ->
+              subscriberState.lastDelivered.ifPresent(offset -> builder.put(partition, offset)));
+    }
+    return backlogReader.computeMessageStats(builder.build());
+  }
+
   @Override
   public void finalizeOffsets(Map<Partition, Offset> offsets) throws StatusException {
     List<ApiFuture<Void>> commitFutures = new ArrayList<>();
@@ -266,6 +313,20 @@ class PubsubLiteUnboundedReader extends UnboundedReader<SequencedMessage>
   }
 
   @Override
+  public long getSplitBacklogBytes() {
+    try {
+      // We use the cache because it allows us to coalesce request, periodically refresh the value
+      // and expire the value after a maximum staleness, but there is only ever one key.
+      return backlogCache.get("Backlog");
+    } catch (ExecutionException e) {
+      LOG.warn(
+          "Failed to retrieve backlog information, reporting the backlog size as UNKNOWN: {}",
+          e.getCause().getMessage());
+      return BACKLOG_UNKNOWN;
+    }
+  }
+
+  @Override
   public UnboundedSource<SequencedMessage, ?> getCurrentSource() {
     return source;
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedSource.java
index 43bdaf4..d4496cc 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedSource.java
@@ -109,7 +109,10 @@ class PubsubLiteUnboundedSource extends UnboundedSource<SequencedMessage, Offset
         }
         statesBuilder.put(partition, state);
       }
-      return new PubsubLiteUnboundedReader(this, statesBuilder.build());
+      return new PubsubLiteUnboundedReader(
+          this,
+          statesBuilder.build(),
+          TopicBacklogReader.create(subscriberOptions.topicBacklogReaderSettings()));
     } catch (StatusException e) {
       throw new IOException(e);
     }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java
index 80bcc01..74d1036 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java
@@ -33,13 +33,12 @@ import com.google.cloud.pubsublite.proto.SubscriberServiceGrpc.SubscriberService
 import io.grpc.StatusException;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
-/** Options needed for a Pub/Sub Lite Subscriber. */
 @AutoValue
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
@@ -50,19 +49,22 @@ public abstract class SubscriberOptions implements Serializable {
   private static final Framework FRAMEWORK = Framework.of("BEAM");
 
   // Required parameters.
-  abstract SubscriptionPath subscriptionPath();
+  public abstract SubscriptionPath subscriptionPath();
 
-  abstract FlowControlSettings flowControlSettings();
+  public abstract FlowControlSettings flowControlSettings();
 
   // Optional parameters.
   /** A set of partitions. If empty, retrieve the set of partitions using an admin client. */
-  abstract Set<Partition> partitions();
+  public abstract Set<Partition> partitions();
+
+  /** The class used to read backlog for the subscription described by subscriptionPath(). */
+  public abstract TopicBacklogReaderSettings topicBacklogReaderSettings();
 
   /** A supplier for the subscriber stub to be used. */
-  abstract @Nullable SerializableSupplier<SubscriberServiceStub> subscriberStubSupplier();
+  public abstract @Nullable SerializableSupplier<SubscriberServiceStub> subscriberStubSupplier();
 
   /** A supplier for the cursor service stub to be used. */
-  abstract @Nullable SerializableSupplier<CursorServiceStub> committerStubSupplier();
+  public abstract @Nullable SerializableSupplier<CursorServiceStub> committerStubSupplier();
 
   /**
    * A factory to override subscriber creation entirely and delegate to another method. Primarily
@@ -87,21 +89,22 @@ public abstract class SubscriberOptions implements Serializable {
   Map<Partition, SubscriberFactory> getSubscriberFactories() {
     ImmutableMap.Builder<Partition, SubscriberFactory> factories = ImmutableMap.builder();
     for (Partition partition : partitions()) {
-      factories.put(
-          partition,
-          Optional.fromNullable(subscriberFactory())
-              .or(
-                  consumer -> {
-                    SubscriberBuilder.Builder builder = SubscriberBuilder.newBuilder();
-                    builder.setMessageConsumer(consumer);
-                    builder.setSubscriptionPath(subscriptionPath());
-                    builder.setPartition(partition);
-                    builder.setContext(PubsubContext.of(FRAMEWORK));
-                    if (subscriberStubSupplier() != null) {
-                      builder.setSubscriberServiceStub(subscriberStubSupplier().get());
-                    }
-                    return builder.build();
-                  }));
+      SubscriberFactory factory = subscriberFactory();
+      if (factory == null) {
+        factory =
+            consumer -> {
+              SubscriberBuilder.Builder builder = SubscriberBuilder.newBuilder();
+              builder.setMessageConsumer(consumer);
+              builder.setSubscriptionPath(subscriptionPath());
+              builder.setPartition(partition);
+              builder.setContext(PubsubContext.of(FRAMEWORK));
+              if (subscriberStubSupplier() != null) {
+                builder.setSubscriberServiceStub(subscriberStubSupplier().get());
+              }
+              return builder.build();
+            };
+      }
+      factories.put(partition, factory);
     }
     return factories.build();
   }
@@ -141,24 +144,44 @@ public abstract class SubscriberOptions implements Serializable {
     public abstract Builder setCommitterStubSupplier(
         SerializableSupplier<CursorServiceStub> stubSupplier);
 
+    public abstract Builder setTopicBacklogReaderSettings(
+        TopicBacklogReaderSettings topicBacklogReaderSettings);
+
+    // Used in unit tests
     abstract Builder setSubscriberFactory(SubscriberFactory subscriberFactory);
 
     abstract Builder setCommitterSupplier(SerializableSupplier<Committer> committerSupplier);
 
+    // Used for implementing build();
+    abstract SubscriptionPath subscriptionPath();
+
+    abstract Set<Partition> partitions();
+
+    abstract Optional<TopicBacklogReaderSettings> topicBacklogReaderSettings();
+
     abstract SubscriberOptions autoBuild();
 
+    @SuppressWarnings("CheckReturnValue")
     public SubscriberOptions build() throws StatusException {
-      SubscriberOptions built = autoBuild();
-      if (!built.partitions().isEmpty()) {
-        return built;
+      if (!partitions().isEmpty() && topicBacklogReaderSettings().isPresent()) {
+        return autoBuild();
+      }
+
+      if (partitions().isEmpty()) {
+        int partitionCount = PartitionLookupUtils.numPartitions(subscriptionPath());
+        ImmutableSet.Builder<Partition> partitions = ImmutableSet.builder();
+        for (int i = 0; i < partitionCount; i++) {
+          partitions.add(Partition.of(i));
+        }
+        setPartitions(partitions.build());
       }
-      int partitionCount = PartitionLookupUtils.numPartitions(built.subscriptionPath());
-      SubscriberOptions.Builder builder = built.toBuilder();
-      ImmutableSet.Builder<Partition> partitions = ImmutableSet.builder();
-      for (int i = 0; i < partitionCount; i++) {
-        partitions.add(Partition.of(i));
+      if (!topicBacklogReaderSettings().isPresent()) {
+        setTopicBacklogReaderSettings(
+            TopicBacklogReaderSettings.newBuilder()
+                .setTopicPathFromSubscriptionPath(subscriptionPath())
+                .build());
       }
-      return builder.setPartitions(partitions.build()).autoBuild();
+      return autoBuild();
     }
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReader.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReader.java
new file mode 100644
index 0000000..37dd288
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.api.core.ApiFuture;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import io.grpc.StatusException;
+import java.util.Map;
+
+/**
+ * The TopicBacklogReader is intended for clients who would like to use the TopicStats API to
+ * aggregate the backlog, or the distance between the current cursor and HEAD across multiple
+ * partitions within a subscription.
+ */
+public interface TopicBacklogReader {
+
+  /** Create a TopicBacklogReader from settings. */
+  static TopicBacklogReader create(TopicBacklogReaderSettings settings) throws StatusException {
+    return settings.instantiate();
+  }
+  /**
+   * Compute and aggregate message statistics for message between the provided start offset and HEAD
+   * for each partition.
+   *
+   * @param subscriptionState A map from partition to the current offset of the subscriber in a
+   *     given partition.
+   * @return a future with either an error or a ComputeMessageStatsResponse with the aggregated
+   *     statistics for messages in the backlog on success.
+   */
+  ApiFuture<ComputeMessageStatsResponse> computeMessageStats(
+      Map<Partition, Offset> subscriptionState);
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderImpl.java
new file mode 100644
index 0000000..58e8f74
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderImpl.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.internal.TopicStatsClient;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import com.google.protobuf.Timestamp;
+import com.google.protobuf.util.Timestamps;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
+
+final class TopicBacklogReaderImpl implements TopicBacklogReader {
+  private final TopicStatsClient client;
+  private final TopicPath topicPath;
+
+  public TopicBacklogReaderImpl(TopicStatsClient client, TopicPath topicPath) {
+    this.client = client;
+    this.topicPath = topicPath;
+  }
+
+  private static Optional<Timestamp> minTimestamp(Optional<Timestamp> t1, Timestamp t2) {
+    if (!t1.isPresent() || Timestamps.compare(t1.get(), t2) > 0) {
+      return Optional.of(t2);
+    }
+    return t1;
+  }
+
+  @Override
+  @SuppressWarnings("dereference.of.nullable")
+  public ApiFuture<ComputeMessageStatsResponse> computeMessageStats(
+      Map<Partition, Offset> subscriptionState) {
+    List<ApiFuture<ComputeMessageStatsResponse>> perPartitionFutures =
+        subscriptionState.entrySet().stream()
+            .map(
+                e ->
+                    client.computeMessageStats(
+                        topicPath, e.getKey(), e.getValue(), Offset.of(Integer.MAX_VALUE)))
+            .collect(Collectors.toList());
+    return ApiFutures.transform(
+        ApiFutures.allAsList(perPartitionFutures),
+        responses -> {
+          Optional<Timestamp> minPublishTime = Optional.empty();
+          Optional<Timestamp> minEventTime = Optional.empty();
+          long messageBytes = 0;
+          long messageCount = 0;
+          for (ComputeMessageStatsResponse response : responses) {
+            messageBytes += response.getMessageBytes();
+            messageCount += response.getMessageCount();
+            if (response.hasMinimumPublishTime()) {
+              minPublishTime = minTimestamp(minPublishTime, response.getMinimumPublishTime());
+            }
+            if (response.hasMinimumEventTime()) {
+              minEventTime = minTimestamp(minPublishTime, response.getMinimumEventTime());
+            }
+          }
+          ComputeMessageStatsResponse.Builder builder =
+              ComputeMessageStatsResponse.newBuilder()
+                  .setMessageBytes(messageBytes)
+                  .setMessageCount(messageCount);
+          minPublishTime.ifPresent(builder::setMinimumPublishTime);
+          minEventTime.ifPresent(builder::setMinimumEventTime);
+          return builder.build();
+        },
+        MoreExecutors.directExecutor());
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java
new file mode 100644
index 0000000..07120fe
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.pubsublite.AdminClient;
+import com.google.cloud.pubsublite.AdminClientSettings;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.internal.ExtractStatus;
+import com.google.cloud.pubsublite.internal.TopicStatsClient;
+import com.google.cloud.pubsublite.internal.TopicStatsClientSettings;
+import com.google.cloud.pubsublite.proto.TopicStatsServiceGrpc.TopicStatsServiceBlockingStub;
+import io.grpc.StatusException;
+import java.io.Serializable;
+import java.util.concurrent.ExecutionException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@AutoValue
+public abstract class TopicBacklogReaderSettings implements Serializable {
+  private static final long serialVersionUID = -4001752066450248673L;
+
+  /**
+   * The topic path for this backlog reader. Either topicPath or subscriptionPath must be set. If
+   * both are set, subscriptionPath will be ignored.
+   */
+  abstract TopicPath topicPath();
+
+  // Optional parameters
+  abstract @Nullable SerializableSupplier<TopicStatsServiceBlockingStub> stub();
+
+  public static Builder newBuilder() {
+    return new AutoValue_TopicBacklogReaderSettings.Builder();
+  }
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+    // Required parameters.
+    public abstract Builder setTopicPath(TopicPath topicPath);
+
+    @SuppressWarnings("argument.type.incompatible")
+    public Builder setTopicPathFromSubscriptionPath(SubscriptionPath subscriptionPath)
+        throws StatusException {
+      try (AdminClient adminClient =
+          AdminClient.create(
+              AdminClientSettings.newBuilder()
+                  .setRegion(subscriptionPath.location().region())
+                  .build())) {
+        return setTopicPath(
+            TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic()));
+      } catch (ExecutionException e) {
+        throw ExtractStatus.toCanonical(e.getCause());
+      } catch (Throwable t) {
+        throw ExtractStatus.toCanonical(t);
+      }
+    }
+
+    public abstract Builder setStub(SerializableSupplier<TopicStatsServiceBlockingStub> stub);
+
+    public abstract TopicBacklogReaderSettings build();
+  }
+
+  @SuppressWarnings("CheckReturnValue")
+  TopicBacklogReader instantiate() throws StatusException {
+    TopicStatsClientSettings.Builder builder = TopicStatsClientSettings.newBuilder();
+    SerializableSupplier<TopicStatsServiceBlockingStub> stub = stub();
+    if (stub != null) {
+      builder.setStub(stub.get());
+    }
+    builder.setRegion(topicPath().location().region());
+    return new TopicBacklogReaderImpl(TopicStatsClient.create(builder.build()), topicPath());
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReaderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReaderTest.java
index a40ba9f..6c652c4 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReaderTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReaderTest.java
@@ -35,10 +35,14 @@ import com.google.cloud.pubsublite.Partition;
 import com.google.cloud.pubsublite.internal.FakeApiService;
 import com.google.cloud.pubsublite.internal.PullSubscriber;
 import com.google.cloud.pubsublite.internal.wire.Committer;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
 import com.google.cloud.pubsublite.proto.Cursor;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
+import com.google.protobuf.Duration;
 import com.google.protobuf.Timestamp;
+import com.google.protobuf.util.Durations;
 import com.google.protobuf.util.Timestamps;
+import io.grpc.Status;
 import io.grpc.StatusException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -49,6 +53,7 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteUnboundedReader.SubscriberState;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.joda.time.Instant;
@@ -70,12 +75,33 @@ public class PubsubLiteUnboundedReaderTest {
 
   abstract static class CommitterFakeService extends FakeApiService implements Committer {}
 
+  private static class FakeTicker extends Ticker {
+    private Timestamp time;
+
+    FakeTicker(Timestamp start) {
+      time = start;
+    }
+
+    @Override
+    public long read() {
+      return Timestamps.toNanos(time);
+    }
+
+    public void advance(Duration duration) {
+      time = Timestamps.add(time, duration);
+    }
+  }
+
   @Spy private CommitterFakeService committer5;
   @Spy private CommitterFakeService committer8;
 
   @SuppressWarnings("unchecked")
   private final UnboundedSource<SequencedMessage, ?> source = mock(UnboundedSource.class);
 
+  @Mock private TopicBacklogReader backlogReader;
+
+  private final FakeTicker ticker = new FakeTicker(Timestamps.fromSeconds(450));
+
   private final PubsubLiteUnboundedReader reader;
 
   private static SequencedMessage exampleMessage(Offset offset, Timestamp publishTime) {
@@ -104,7 +130,10 @@ public class PubsubLiteUnboundedReaderTest {
     state8.committer = committer8;
     reader =
         new PubsubLiteUnboundedReader(
-            source, ImmutableMap.of(Partition.of(5), state5, Partition.of(8), state8));
+            source,
+            ImmutableMap.of(Partition.of(5), state5, Partition.of(8), state8),
+            backlogReader,
+            ticker);
   }
 
   @Test
@@ -228,4 +257,74 @@ public class PubsubLiteUnboundedReaderTest {
     mark.finalizeCheckpoint();
     verify(committer5).commitOffset(Offset.of(10));
   }
+
+  @Test
+  public void splitBacklogBytes_returnsUnknownBacklogOnError() throws Exception {
+    when(backlogReader.computeMessageStats(ImmutableMap.of()))
+        .thenReturn(ApiFutures.immediateFailedFuture(new StatusException(Status.UNAVAILABLE)));
+    assertThat(PubsubLiteUnboundedReader.BACKLOG_UNKNOWN, equalTo(reader.getSplitBacklogBytes()));
+  }
+
+  @Test
+  public void splitBacklogBytes_computesBacklog() throws Exception {
+    ComputeMessageStatsResponse response =
+        ComputeMessageStatsResponse.newBuilder().setMessageBytes(40).build();
+    when(backlogReader.computeMessageStats(ImmutableMap.of()))
+        .thenReturn(ApiFutures.immediateFuture(response));
+    assertThat(response.getMessageBytes(), equalTo(reader.getSplitBacklogBytes()));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void splitBacklogBytes_computesBacklogOncePerTenSeconds() throws Exception {
+    ComputeMessageStatsResponse response1 =
+        ComputeMessageStatsResponse.newBuilder().setMessageBytes(40).build();
+    ComputeMessageStatsResponse response2 =
+        ComputeMessageStatsResponse.newBuilder().setMessageBytes(50).build();
+
+    when(backlogReader.computeMessageStats(ImmutableMap.of()))
+        .thenReturn(ApiFutures.immediateFuture(response1), ApiFutures.immediateFuture(response2));
+
+    assertThat(response1.getMessageBytes(), equalTo(reader.getSplitBacklogBytes()));
+    ticker.advance(Durations.fromSeconds(10));
+    assertThat(response1.getMessageBytes(), equalTo(reader.getSplitBacklogBytes()));
+    ticker.advance(Durations.fromSeconds(1));
+    assertThat(response2.getMessageBytes(), equalTo(reader.getSplitBacklogBytes()));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void splitBacklogBytes_oldValueExpiresAfterOneMinute() throws Exception {
+    ComputeMessageStatsResponse response =
+        ComputeMessageStatsResponse.newBuilder().setMessageBytes(40).build();
+
+    when(backlogReader.computeMessageStats(ImmutableMap.of()))
+        .thenReturn(
+            ApiFutures.immediateFuture(response),
+            ApiFutures.immediateFailedFuture(new StatusException(Status.UNAVAILABLE)));
+
+    assertThat(response.getMessageBytes(), equalTo(reader.getSplitBacklogBytes()));
+    ticker.advance(Durations.fromSeconds(30));
+    assertThat(response.getMessageBytes(), equalTo(reader.getSplitBacklogBytes()));
+    ticker.advance(Durations.fromSeconds(31));
+    assertThat(PubsubLiteUnboundedReader.BACKLOG_UNKNOWN, equalTo(reader.getSplitBacklogBytes()));
+  }
+
+  @Test
+  public void splitBacklogBytes_usesCorrectCursorValues() throws Exception {
+    SequencedMessage message1 = exampleMessage(Offset.of(10), randomMilliAllignedTimestamp());
+    SequencedMessage message2 = exampleMessage(Offset.of(888), randomMilliAllignedTimestamp());
+    ComputeMessageStatsResponse response =
+        ComputeMessageStatsResponse.newBuilder().setMessageBytes(40).build();
+
+    when(subscriber5.pull()).thenReturn(ImmutableList.of(message1));
+    when(subscriber8.pull()).thenReturn(ImmutableList.of(message2));
+    when(backlogReader.computeMessageStats(
+            ImmutableMap.of(Partition.of(5), Offset.of(10), Partition.of(8), Offset.of(888))))
+        .thenReturn(ApiFutures.immediateFuture(response));
+
+    assertTrue(reader.start());
+    assertTrue(reader.advance());
+    assertThat(response.getMessageBytes(), equalTo(reader.getSplitBacklogBytes()));
+  }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderImplTest.java
new file mode 100644
index 0000000..090f723
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderImplTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.when;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsublite.CloudZone;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.ProjectNumber;
+import com.google.cloud.pubsublite.TopicName;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.internal.ExtractStatus;
+import com.google.cloud.pubsublite.internal.TopicStatsClient;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import com.google.protobuf.Timestamp;
+import io.grpc.Status;
+import io.grpc.Status.Code;
+import io.grpc.StatusException;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+@SuppressWarnings("uninitialized")
+@RunWith(JUnit4.class)
+public final class TopicBacklogReaderImplTest {
+
+  @Rule public final MockitoRule mockito = MockitoJUnit.rule();
+
+  @Mock TopicStatsClient mockClient;
+
+  private TopicPath topicPath;
+  private TopicBacklogReader reader;
+
+  @Before
+  public void setUp() throws Exception {
+    this.topicPath =
+        TopicPath.newBuilder()
+            .setProject(ProjectNumber.of(4))
+            .setName(TopicName.of("test"))
+            .setLocation(CloudZone.parse("us-central1-b"))
+            .build();
+    this.reader = new TopicBacklogReaderImpl(mockClient, topicPath);
+  }
+
+  @SuppressWarnings("incompatible")
+  @Test
+  public void computeMessageStats_partialFailure() throws Exception {
+    ComputeMessageStatsResponse partition1 = ComputeMessageStatsResponse.getDefaultInstance();
+
+    when(mockClient.computeMessageStats(
+            topicPath, Partition.of(1), Offset.of(10), Offset.of(Integer.MAX_VALUE)))
+        .thenReturn(ApiFutures.immediateFuture(partition1));
+    when(mockClient.computeMessageStats(
+            topicPath, Partition.of(2), Offset.of(20), Offset.of(Integer.MAX_VALUE)))
+        .thenReturn(ApiFutures.immediateFailedFuture(new StatusException(Status.UNAVAILABLE)));
+
+    ImmutableMap.Builder<Partition, Offset> builder = ImmutableMap.builder();
+    ApiFuture<ComputeMessageStatsResponse> future =
+        reader.computeMessageStats(
+            ImmutableMap.of(Partition.of(1), Offset.of(10), Partition.of(2), Offset.of(20)));
+
+    ExecutionException ex = assertThrows(ExecutionException.class, future::get);
+    assertEquals(Code.UNAVAILABLE, ExtractStatus.extract(ex.getCause()).get().getCode());
+  }
+
+  @Test
+  public void computeMessageStats_aggregatesEmptyMessages() throws Exception {
+    ComputeMessageStatsResponse partition1 = ComputeMessageStatsResponse.getDefaultInstance();
+    ComputeMessageStatsResponse partition2 = ComputeMessageStatsResponse.getDefaultInstance();
+    ComputeMessageStatsResponse aggregate = ComputeMessageStatsResponse.getDefaultInstance();
+
+    when(mockClient.computeMessageStats(
+            topicPath, Partition.of(1), Offset.of(10), Offset.of(Integer.MAX_VALUE)))
+        .thenReturn(ApiFutures.immediateFuture(partition1));
+    when(mockClient.computeMessageStats(
+            topicPath, Partition.of(2), Offset.of(20), Offset.of(Integer.MAX_VALUE)))
+        .thenReturn(ApiFutures.immediateFuture(partition2));
+
+    ImmutableMap.Builder<Partition, Offset> builder = ImmutableMap.builder();
+    ApiFuture<ComputeMessageStatsResponse> future =
+        reader.computeMessageStats(
+            ImmutableMap.of(Partition.of(1), Offset.of(10), Partition.of(2), Offset.of(20)));
+
+    assertEquals(future.get(), aggregate);
+  }
+
+  @Test
+  public void computeMessageStats_timestampsAggregatedWhenPresent() throws Exception {
+    Timestamp minEventTime = Timestamp.newBuilder().setSeconds(1000).setNanos(10).build();
+    Timestamp minPublishTime = Timestamp.newBuilder().setSeconds(1001).setNanos(11).build();
+    ComputeMessageStatsResponse partition1 =
+        ComputeMessageStatsResponse.newBuilder().setMinimumPublishTime(minPublishTime).build();
+    ComputeMessageStatsResponse partition2 =
+        ComputeMessageStatsResponse.newBuilder().setMinimumEventTime(minEventTime).build();
+    ComputeMessageStatsResponse aggregate =
+        ComputeMessageStatsResponse.newBuilder()
+            .setMinimumEventTime(minEventTime)
+            .setMinimumPublishTime(minPublishTime)
+            .build();
+
+    when(mockClient.computeMessageStats(
+            topicPath, Partition.of(1), Offset.of(10), Offset.of(Integer.MAX_VALUE)))
+        .thenReturn(ApiFutures.immediateFuture(partition1));
+    when(mockClient.computeMessageStats(
+            topicPath, Partition.of(2), Offset.of(20), Offset.of(Integer.MAX_VALUE)))
+        .thenReturn(ApiFutures.immediateFuture(partition2));
+
+    ImmutableMap.Builder<Partition, Offset> builder = ImmutableMap.builder();
+    ApiFuture<ComputeMessageStatsResponse> future =
+        reader.computeMessageStats(
+            ImmutableMap.of(Partition.of(1), Offset.of(10), Partition.of(2), Offset.of(20)));
+
+    assertEquals(future.get(), aggregate);
+  }
+
+  @Test
+  public void computeMessageStats_resultsAggregated() throws Exception {
+    Timestamp minEventTime = Timestamp.newBuilder().setSeconds(1000).setNanos(10).build();
+    Timestamp minPublishTime = Timestamp.newBuilder().setSeconds(1001).setNanos(11).build();
+    ComputeMessageStatsResponse partition1 =
+        ComputeMessageStatsResponse.newBuilder()
+            .setMessageCount(10)
+            .setMessageBytes(100)
+            .setMinimumEventTime(minEventTime.toBuilder().setSeconds(1002).build())
+            .setMinimumPublishTime(minPublishTime)
+            .build();
+    ComputeMessageStatsResponse partition2 =
+        ComputeMessageStatsResponse.newBuilder()
+            .setMessageCount(20)
+            .setMessageBytes(200)
+            .setMinimumEventTime(minEventTime)
+            .setMinimumPublishTime(minPublishTime.toBuilder().setNanos(12).build())
+            .build();
+    ComputeMessageStatsResponse aggregate =
+        ComputeMessageStatsResponse.newBuilder()
+            .setMessageCount(30)
+            .setMessageBytes(300)
+            .setMinimumEventTime(minEventTime)
+            .setMinimumPublishTime(minPublishTime)
+            .build();
+
+    when(mockClient.computeMessageStats(
+            topicPath, Partition.of(1), Offset.of(10), Offset.of(Integer.MAX_VALUE)))
+        .thenReturn(ApiFutures.immediateFuture(partition1));
+    when(mockClient.computeMessageStats(
+            topicPath, Partition.of(2), Offset.of(20), Offset.of(Integer.MAX_VALUE)))
+        .thenReturn(ApiFutures.immediateFuture(partition2));
+
+    ImmutableMap.Builder<Partition, Offset> builder = ImmutableMap.builder();
+    ApiFuture<ComputeMessageStatsResponse> future =
+        reader.computeMessageStats(
+            ImmutableMap.of(Partition.of(1), Offset.of(10), Partition.of(2), Offset.of(20)));
+
+    assertEquals(future.get(), aggregate);
+  }
+}