You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2021/12/28 21:01:02 UTC

[beam] branch master updated: [BEAM-13402] Version bump Pub/Sub Lite and implement changes to ensure client and channel reuse (#16358)

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d7ccd0f  [BEAM-13402] Version bump Pub/Sub Lite and implement changes to ensure client and channel reuse (#16358)
d7ccd0f is described below

commit d7ccd0fe39958c4cd33fd1da850f8f13b2874495
Author: dpcollins-google <40...@users.noreply.github.com>
AuthorDate: Tue Dec 28 15:59:56 2021 -0500

    [BEAM-13402] Version bump Pub/Sub Lite and implement changes to ensure client and channel reuse (#16358)
    
    * [BEAM-13402] Version bump Pub/Sub Lite and implement changes to ensure client (and channel) reuse
    
    * revert TopicBacklogReader changes- ManagedBacklogReader already handles this
    
    * reformat
    
    * revert TopicBacklogReaderSettings changes
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   2 +-
 .../pubsublite/internal/InitialOffsetReader.java   |   5 +-
 .../internal/InitialOffsetReaderImpl.java          |  13 +--
 .../internal/PerSubscriptionPartitionSdf.java      |   7 +-
 .../pubsublite/internal/PublisherAssembler.java    |  57 +++++++----
 .../pubsublite/internal/SubscribeTransform.java    |  11 ++-
 .../pubsublite/internal/SubscriberAssembler.java   | 105 +++++++++++++--------
 .../SubscriptionPartitionProcessorImpl.java        |   2 +-
 .../internal/TopicBacklogReaderImpl.java           |  22 +----
 .../internal/TopicBacklogReaderSettings.java       |  45 ++-------
 10 files changed, 136 insertions(+), 133 deletions(-)

diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 76d7b47..a6c53d1 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -452,7 +452,7 @@ class BeamModulePlugin implements Plugin<Project> {
     def errorprone_version = "2.3.4"
     def google_clients_version = "1.32.1"
     def google_cloud_bigdataoss_version = "2.2.4"
-    def google_cloud_pubsublite_version = "1.4.6"
+    def google_cloud_pubsublite_version = "1.4.7"
     def google_code_gson_version = "2.8.9"
     def google_oauth_clients_version = "1.32.1"
     // Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/InitialOffsetReader.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/InitialOffsetReader.java
index 402a37b..292d3e1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/InitialOffsetReader.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/InitialOffsetReader.java
@@ -20,9 +20,6 @@ package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 import com.google.api.gax.rpc.ApiException;
 import com.google.cloud.pubsublite.Offset;
 
-interface InitialOffsetReader extends AutoCloseable {
+interface InitialOffsetReader {
   Offset read() throws ApiException;
-
-  @Override
-  void close();
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/InitialOffsetReaderImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/InitialOffsetReaderImpl.java
index 5972b13..ddee9b7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/InitialOffsetReaderImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/InitialOffsetReaderImpl.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
 import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
+import static java.util.concurrent.TimeUnit.MINUTES;
 
 import com.google.api.gax.rpc.ApiException;
 import com.google.cloud.pubsublite.Offset;
@@ -31,8 +32,9 @@ class InitialOffsetReaderImpl implements InitialOffsetReader {
   private final SubscriptionPath subscription;
   private final Partition partition;
 
-  InitialOffsetReaderImpl(CursorClient client, SubscriptionPath subscription, Partition partition) {
-    this.client = client;
+  InitialOffsetReaderImpl(
+      CursorClient unownedCursorClient, SubscriptionPath subscription, Partition partition) {
+    this.client = unownedCursorClient;
     this.subscription = subscription;
     this.partition = partition;
   }
@@ -40,15 +42,10 @@ class InitialOffsetReaderImpl implements InitialOffsetReader {
   @Override
   public Offset read() throws ApiException {
     try {
-      Map<Partition, Offset> results = client.listPartitionCursors(subscription).get();
+      Map<Partition, Offset> results = client.listPartitionCursors(subscription).get(1, MINUTES);
       return results.getOrDefault(partition, Offset.of(0));
     } catch (Throwable t) {
       throw toCanonical(t).underlying;
     }
   }
-
-  @Override
-  public void close() {
-    client.close();
-  }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java
index 22e1623..22b1389 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java
@@ -96,11 +96,8 @@ class PerSubscriptionPartitionSdf extends DoFn<SubscriptionPartition, SequencedM
   @GetInitialRestriction
   public OffsetByteRange getInitialRestriction(
       @Element SubscriptionPartition subscriptionPartition) {
-    try (InitialOffsetReader reader = offsetReaderFactory.apply(subscriptionPartition)) {
-      Offset offset = reader.read();
-      return OffsetByteRange.of(
-          new OffsetRange(offset.value(), Long.MAX_VALUE /* open interval */));
-    }
+    Offset offset = offsetReaderFactory.apply(subscriptionPartition).read();
+    return OffsetByteRange.of(new OffsetRange(offset.value(), Long.MAX_VALUE /* open interval */));
   }
 
   @NewTracker
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherAssembler.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherAssembler.java
index bb8408d..91f0d02 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherAssembler.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherAssembler.java
@@ -17,18 +17,20 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
+import static com.google.cloud.pubsublite.cloudpubsub.PublisherSettings.DEFAULT_BATCHING_SETTINGS;
 import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
-import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
 import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
+import static com.google.cloud.pubsublite.internal.wire.ServiceClients.getCallContext;
 
+import com.google.api.gax.rpc.ApiCallContext;
 import com.google.api.gax.rpc.ApiException;
 import com.google.cloud.pubsublite.AdminClient;
 import com.google.cloud.pubsublite.AdminClientSettings;
 import com.google.cloud.pubsublite.MessageMetadata;
 import com.google.cloud.pubsublite.Partition;
-import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
 import com.google.cloud.pubsublite.internal.Publisher;
 import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
+import com.google.cloud.pubsublite.internal.wire.PartitionPublisherFactory;
 import com.google.cloud.pubsublite.internal.wire.PubsubContext;
 import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
 import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
@@ -64,33 +66,50 @@ class PublisherAssembler {
     }
   }
 
-  private PublisherServiceClient newServiceClient(Partition partition) {
-    PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder();
-    settingsBuilder =
-        addDefaultMetadata(
-            PubsubContext.of(FRAMEWORK),
-            RoutingMetadata.of(options.topicPath(), partition),
-            settingsBuilder);
+  private PublisherServiceClient newServiceClient() {
     try {
       return PublisherServiceClient.create(
-          addDefaultSettings(options.topicPath().location().extractRegion(), settingsBuilder));
+          addDefaultSettings(
+              options.topicPath().location().extractRegion(),
+              PublisherServiceSettings.newBuilder()));
     } catch (Throwable t) {
       throw toCanonical(t).underlying;
     }
   }
 
-  @SuppressWarnings("unchecked")
+  private PartitionPublisherFactory getPartitionPublisherFactory() throws ApiException {
+    PublisherServiceClient client = newServiceClient();
+    return new PartitionPublisherFactory() {
+      @Override
+      public com.google.cloud.pubsublite.internal.Publisher<MessageMetadata> newPublisher(
+          Partition partition) throws ApiException {
+        SinglePartitionPublisherBuilder.Builder singlePartitionBuilder =
+            SinglePartitionPublisherBuilder.newBuilder()
+                .setTopic(options.topicPath())
+                .setPartition(partition)
+                .setBatchingSettings(DEFAULT_BATCHING_SETTINGS)
+                .setStreamFactory(
+                    responseStream -> {
+                      ApiCallContext context =
+                          getCallContext(
+                              PubsubContext.of(FRAMEWORK),
+                              RoutingMetadata.of(options.topicPath(), partition));
+                      return client.publishCallable().splitCall(responseStream, context);
+                    });
+        return singlePartitionBuilder.build();
+      }
+
+      @Override
+      public void close() {
+        client.close();
+      }
+    };
+  }
+
   Publisher<MessageMetadata> newPublisher() throws ApiException {
     return PartitionCountWatchingPublisherSettings.newBuilder()
         .setTopic(options.topicPath())
-        .setPublisherFactory(
-            partition ->
-                SinglePartitionPublisherBuilder.newBuilder()
-                    .setTopic(options.topicPath())
-                    .setPartition(partition)
-                    .setServiceClient(newServiceClient(partition))
-                    .setBatchingSettings(PublisherSettings.DEFAULT_BATCHING_SETTINGS)
-                    .build())
+        .setPublisherFactory(getPartitionPublisherFactory())
         .setAdminClient(newAdminClient())
         .build()
         .instantiate();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
index edf4f51..4b572b6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
@@ -43,6 +43,8 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath;
 import org.joda.time.Duration;
 
 public class SubscribeTransform extends PTransform<PBegin, PCollection<SequencedMessage>> {
+  private static final long MEBIBYTE = 1L << 20;
+
   private final SubscriberOptions options;
 
   public SubscribeTransform(SubscriberOptions options) {
@@ -91,6 +93,13 @@ public class SubscribeTransform extends PTransform<PBegin, PCollection<Sequenced
     return new SubscriberAssembler(options, subscriptionPartition.partition()).getBacklogReader();
   }
 
+  private long calculateMinWindowBytes() {
+    long minFromFlowControl =
+        LongMath.saturatedMultiply(options.flowControlSettings().bytesOutstanding(), 10);
+    // Dataflow will not accept outputs larger than 1 GiB. Cap the maximum at 750 MiB to avoid this.
+    return Math.min(minFromFlowControl, 750 * MEBIBYTE);
+  }
+
   private TrackerWithProgress newRestrictionTracker(
       TopicBacklogReader backlogReader, OffsetByteRange initial) {
     return new OffsetByteRangeTracker(
@@ -98,7 +107,7 @@ public class SubscribeTransform extends PTransform<PBegin, PCollection<Sequenced
         backlogReader,
         Stopwatch.createUnstarted(),
         options.minBundleTimeout(),
-        LongMath.saturatedMultiply(options.flowControlSettings().bytesOutstanding(), 10));
+        calculateMinWindowBytes());
   }
 
   private InitialOffsetReader newInitialOffsetReader(SubscriptionPartition subscriptionPartition) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriberAssembler.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriberAssembler.java
index da035be..7825ab8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriberAssembler.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriberAssembler.java
@@ -18,47 +18,66 @@
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
 import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
-import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
 import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
+import static com.google.cloud.pubsublite.internal.wire.ServiceClients.getCallContext;
 import static java.util.concurrent.TimeUnit.MINUTES;
 
+import com.google.api.gax.rpc.ApiCallContext;
 import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.AdminClient;
+import com.google.cloud.pubsublite.AdminClientSettings;
 import com.google.cloud.pubsublite.Offset;
 import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicPath;
 import com.google.cloud.pubsublite.internal.CursorClient;
 import com.google.cloud.pubsublite.internal.CursorClientSettings;
+import com.google.cloud.pubsublite.internal.ExtractStatus;
 import com.google.cloud.pubsublite.internal.wire.PubsubContext;
 import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
 import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
 import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
 import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
-import com.google.cloud.pubsublite.proto.CommitCursorRequest;
 import com.google.cloud.pubsublite.proto.Cursor;
 import com.google.cloud.pubsublite.proto.SeekRequest;
-import com.google.cloud.pubsublite.v1.CursorServiceClient;
-import com.google.cloud.pubsublite.v1.CursorServiceSettings;
 import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
 import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
 
 class SubscriberAssembler {
   private static final Framework FRAMEWORK = Framework.of("BEAM");
+  private static final ConcurrentHashMap<SubscriptionPath, TopicPath> KNOWN_PATHS =
+      new ConcurrentHashMap<>();
+  private static final ConcurrentHashMap<SubscriptionPath, SubscriberServiceClient> SUB_CLIENTS =
+      new ConcurrentHashMap<>();
+  private static final ConcurrentHashMap<SubscriptionPath, CursorClient> CURSOR_CLIENTS =
+      new ConcurrentHashMap<>();
+
   private final SubscriberOptions options;
   private final Partition partition;
 
-  SubscriberAssembler(SubscriberOptions options, Partition partition) {
-    this.options = options;
-    this.partition = partition;
+  private static TopicPath lookupTopicPath(SubscriptionPath subscriptionPath) {
+    try (AdminClient adminClient =
+        AdminClient.create(
+            AdminClientSettings.newBuilder()
+                .setRegion(subscriptionPath.location().extractRegion())
+                .build())) {
+      return TopicPath.parse(
+          adminClient.getSubscription(subscriptionPath).get(1, MINUTES).getTopic());
+    } catch (Throwable t) {
+      throw ExtractStatus.toCanonical(t).underlying;
+    }
+  }
+
+  private TopicPath getTopicPath() {
+    return KNOWN_PATHS.computeIfAbsent(
+        options.subscriptionPath(), SubscriberAssembler::lookupTopicPath);
   }
 
   private SubscriberServiceClient newSubscriberServiceClient() throws ApiException {
     try {
       SubscriberServiceSettings.Builder settingsBuilder = SubscriberServiceSettings.newBuilder();
-      settingsBuilder =
-          addDefaultMetadata(
-              PubsubContext.of(FRAMEWORK),
-              RoutingMetadata.of(options.subscriptionPath(), partition),
-              settingsBuilder);
       return SubscriberServiceClient.create(
           addDefaultSettings(
               options.subscriptionPath().location().extractRegion(), settingsBuilder));
@@ -67,13 +86,42 @@ class SubscriberAssembler {
     }
   }
 
+  private SubscriberServiceClient getSubscriberServiceClient() {
+    return SUB_CLIENTS.computeIfAbsent(
+        options.subscriptionPath(), path -> newSubscriberServiceClient());
+  }
+
+  private CursorClient newCursorClient() throws ApiException {
+    return CursorClient.create(
+        CursorClientSettings.newBuilder()
+            .setRegion(options.subscriptionPath().location().extractRegion())
+            .build());
+  }
+
+  private CursorClient getCursorClient() {
+    return CURSOR_CLIENTS.computeIfAbsent(options.subscriptionPath(), path -> newCursorClient());
+  }
+
+  SubscriberAssembler(SubscriberOptions options, Partition partition) {
+    this.options = options;
+    this.partition = partition;
+  }
+
   SubscriberFactory getSubscriberFactory(Offset initialOffset) {
+    SubscriberServiceClient client = getSubscriberServiceClient();
     return consumer ->
         SubscriberBuilder.newBuilder()
             .setMessageConsumer(consumer)
             .setSubscriptionPath(options.subscriptionPath())
             .setPartition(partition)
-            .setServiceClient(newSubscriberServiceClient())
+            .setStreamFactory(
+                responseStream -> {
+                  ApiCallContext context =
+                      getCallContext(
+                          PubsubContext.of(FRAMEWORK),
+                          RoutingMetadata.of(options.subscriptionPath(), partition));
+                  return client.subscribeCallable().splitCall(responseStream, context);
+                })
             .setInitialLocation(
                 SeekRequest.newBuilder()
                     .setCursor(Cursor.newBuilder().setOffset(initialOffset.value()))
@@ -81,28 +129,11 @@ class SubscriberAssembler {
             .build();
   }
 
-  private CursorServiceClient newCursorServiceClient() throws ApiException {
-    try {
-      return CursorServiceClient.create(
-          addDefaultSettings(
-              options.subscriptionPath().location().extractRegion(),
-              CursorServiceSettings.newBuilder()));
-    } catch (Throwable t) {
-      throw toCanonical(t).underlying;
-    }
-  }
-
   BlockingCommitter getCommitter() {
     return offset -> {
       try {
-        newCursorServiceClient()
-            .commitCursorCallable()
-            .futureCall(
-                CommitCursorRequest.newBuilder()
-                    .setSubscription(options.subscriptionPath().toString())
-                    .setPartition(partition.value())
-                    .setCursor(Cursor.newBuilder().setOffset(offset.value()))
-                    .build())
+        getCursorClient()
+            .commitCursor(options.subscriptionPath(), partition, offset)
             .get(1, MINUTES);
       } catch (Throwable t) {
         throw toCanonical(t).underlying;
@@ -112,19 +143,13 @@ class SubscriberAssembler {
 
   TopicBacklogReader getBacklogReader() {
     return TopicBacklogReaderSettings.newBuilder()
-        .setTopicPathFromSubscriptionPath(options.subscriptionPath())
+        .setTopicPath(getTopicPath())
         .setPartition(partition)
         .build()
         .instantiate();
   }
 
   InitialOffsetReader getInitialOffsetReader() {
-    return new InitialOffsetReaderImpl(
-        CursorClient.create(
-            CursorClientSettings.newBuilder()
-                .setRegion(options.subscriptionPath().location().extractRegion())
-                .build()),
-        options.subscriptionPath(),
-        partition);
+    return new InitialOffsetReaderImpl(getCursorClient(), options.subscriptionPath(), partition);
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
index 83ef45d..8b2eff9 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
@@ -55,7 +55,7 @@ class SubscriptionPartitionProcessorImpl extends Listener
   private final OutputReceiver<SequencedMessage> receiver;
   private final Subscriber subscriber;
   private final SettableFuture<Void> completionFuture = SettableFuture.create();
-  // Queue to transfer messages from subscriber callback to runFor downcall, since all
+  // Queue to transfer messages from subscriber callback to runFor downcall.
   private final SynchronousQueue<List<SequencedMessage>> transfer = new SynchronousQueue<>();
   private final FlowControlSettings flowControlSettings;
   private Optional<Offset> lastClaimedOffset = Optional.empty();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReaderImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReaderImpl.java
index fd27081..96980b5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReaderImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReaderImpl.java
@@ -18,7 +18,7 @@
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
 import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.TimeUnit.MINUTES;
 
 import com.google.api.gax.rpc.ApiException;
 import com.google.cloud.pubsublite.Offset;
@@ -26,13 +26,8 @@ import com.google.cloud.pubsublite.Partition;
 import com.google.cloud.pubsublite.TopicPath;
 import com.google.cloud.pubsublite.internal.TopicStatsClient;
 import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
-import java.util.concurrent.ExecutionException;
-import javax.annotation.Nonnull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 final class TopicBacklogReaderImpl implements TopicBacklogReader {
-  private static final Logger LOG = LoggerFactory.getLogger(TopicBacklogReaderImpl.class);
   private final TopicStatsClient client;
   private final TopicPath topicPath;
   private final Partition partition;
@@ -49,21 +44,14 @@ final class TopicBacklogReaderImpl implements TopicBacklogReader {
     try {
       return client
           .computeMessageStats(topicPath, partition, offset, Offset.of(Integer.MAX_VALUE))
-          .get();
-    } catch (ExecutionException e) {
-      @Nonnull Throwable cause = checkNotNull(e.getCause());
-      throw toCanonical(cause).underlying;
-    } catch (InterruptedException e) {
-      throw toCanonical(e).underlying;
+          .get(1, MINUTES);
+    } catch (Throwable t) {
+      throw toCanonical(t).underlying;
     }
   }
 
   @Override
   public void close() {
-    try {
-      client.close();
-    } catch (Exception e) {
-      LOG.warn("Failed to close topic stats client.", e);
-    }
+    client.close();
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReaderSettings.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReaderSettings.java
index a940bfd..45f9f40 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReaderSettings.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReaderSettings.java
@@ -17,32 +17,20 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
-import static java.util.concurrent.TimeUnit.MINUTES;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
-
 import com.google.api.gax.rpc.ApiException;
 import com.google.auto.value.AutoValue;
-import com.google.cloud.pubsublite.AdminClient;
-import com.google.cloud.pubsublite.AdminClientSettings;
 import com.google.cloud.pubsublite.Partition;
-import com.google.cloud.pubsublite.SubscriptionPath;
 import com.google.cloud.pubsublite.TopicPath;
-import com.google.cloud.pubsublite.internal.ExtractStatus;
 import com.google.cloud.pubsublite.internal.TopicStatsClient;
 import com.google.cloud.pubsublite.internal.TopicStatsClientSettings;
 import java.io.Serializable;
-import java.util.concurrent.ExecutionException;
-import javax.annotation.Nonnull;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker;
 
 @AutoValue
 abstract class TopicBacklogReaderSettings implements Serializable {
   private static final long serialVersionUID = -4001752066450248673L;
 
-  /**
-   * The topic path for this backlog reader. Either topicPath or subscriptionPath must be set. If
-   * both are set, subscriptionPath will be ignored.
-   */
+  /** The topic path for this backlog reader. */
   abstract TopicPath topicPath();
 
   abstract Partition partition();
@@ -57,37 +45,20 @@ abstract class TopicBacklogReaderSettings implements Serializable {
     // Required parameters.
     abstract Builder setTopicPath(TopicPath topicPath);
 
-    @SuppressWarnings("assignment.type.incompatible")
-    Builder setTopicPathFromSubscriptionPath(SubscriptionPath subscriptionPath)
-        throws ApiException {
-      try (AdminClient adminClient =
-          AdminClient.create(
-              AdminClientSettings.newBuilder()
-                  .setRegion(subscriptionPath.location().extractRegion())
-                  .build())) {
-        return setTopicPath(
-            TopicPath.parse(
-                adminClient.getSubscription(subscriptionPath).get(1, MINUTES).getTopic()));
-      } catch (ExecutionException e) {
-        @Nonnull Throwable cause = checkNotNull(e.getCause());
-        throw ExtractStatus.toCanonical(cause).underlying;
-      } catch (Throwable t) {
-        throw ExtractStatus.toCanonical(t).underlying;
-      }
-    }
-
     abstract Builder setPartition(Partition partition);
 
     abstract TopicBacklogReaderSettings build();
   }
 
-  TopicBacklogReader instantiate() throws ApiException {
-    TopicStatsClientSettings settings =
+  private TopicStatsClient newClient() {
+    return TopicStatsClient.create(
         TopicStatsClientSettings.newBuilder()
             .setRegion(topicPath().location().extractRegion())
-            .build();
-    TopicBacklogReader impl =
-        new TopicBacklogReaderImpl(TopicStatsClient.create(settings), topicPath(), partition());
+            .build());
+  }
+
+  TopicBacklogReader instantiate() throws ApiException {
+    TopicBacklogReader impl = new TopicBacklogReaderImpl(newClient(), topicPath(), partition());
     return new LimitingTopicBacklogReader(impl, Ticker.systemTicker());
   }
 }