You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2021/12/28 21:01:02 UTC
[beam] branch master updated: [BEAM-13402] Version bump Pub/Sub Lite and implement changes to ensure client and channel reuse (#16358)
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d7ccd0f [BEAM-13402] Version bump Pub/Sub Lite and implement changes to ensure client and channel reuse (#16358)
d7ccd0f is described below
commit d7ccd0fe39958c4cd33fd1da850f8f13b2874495
Author: dpcollins-google <40...@users.noreply.github.com>
AuthorDate: Tue Dec 28 15:59:56 2021 -0500
[BEAM-13402] Version bump Pub/Sub Lite and implement changes to ensure client and channel reuse (#16358)
* [BEAM-13402] Version bump Pub/Sub Lite and implement changes to ensure client (and channel) reuse
* revert TopicBacklogReader changes- ManagedBacklogReader already handles this
* reformat
* revert TopicBacklogReaderSettings changes
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +-
.../pubsublite/internal/InitialOffsetReader.java | 5 +-
.../internal/InitialOffsetReaderImpl.java | 13 +--
.../internal/PerSubscriptionPartitionSdf.java | 7 +-
.../pubsublite/internal/PublisherAssembler.java | 57 +++++++----
.../pubsublite/internal/SubscribeTransform.java | 11 ++-
.../pubsublite/internal/SubscriberAssembler.java | 105 +++++++++++++--------
.../SubscriptionPartitionProcessorImpl.java | 2 +-
.../internal/TopicBacklogReaderImpl.java | 22 +----
.../internal/TopicBacklogReaderSettings.java | 45 ++-------
10 files changed, 136 insertions(+), 133 deletions(-)
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 76d7b47..a6c53d1 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -452,7 +452,7 @@ class BeamModulePlugin implements Plugin<Project> {
def errorprone_version = "2.3.4"
def google_clients_version = "1.32.1"
def google_cloud_bigdataoss_version = "2.2.4"
- def google_cloud_pubsublite_version = "1.4.6"
+ def google_cloud_pubsublite_version = "1.4.7"
def google_code_gson_version = "2.8.9"
def google_oauth_clients_version = "1.32.1"
// Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/InitialOffsetReader.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/InitialOffsetReader.java
index 402a37b..292d3e1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/InitialOffsetReader.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/InitialOffsetReader.java
@@ -20,9 +20,6 @@ package org.apache.beam.sdk.io.gcp.pubsublite.internal;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Offset;
-interface InitialOffsetReader extends AutoCloseable {
+interface InitialOffsetReader {
Offset read() throws ApiException;
-
- @Override
- void close();
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/InitialOffsetReaderImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/InitialOffsetReaderImpl.java
index 5972b13..ddee9b7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/InitialOffsetReaderImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/InitialOffsetReaderImpl.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.pubsublite.internal;
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
+import static java.util.concurrent.TimeUnit.MINUTES;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Offset;
@@ -31,8 +32,9 @@ class InitialOffsetReaderImpl implements InitialOffsetReader {
private final SubscriptionPath subscription;
private final Partition partition;
- InitialOffsetReaderImpl(CursorClient client, SubscriptionPath subscription, Partition partition) {
- this.client = client;
+ InitialOffsetReaderImpl(
+ CursorClient unownedCursorClient, SubscriptionPath subscription, Partition partition) {
+ this.client = unownedCursorClient;
this.subscription = subscription;
this.partition = partition;
}
@@ -40,15 +42,10 @@ class InitialOffsetReaderImpl implements InitialOffsetReader {
@Override
public Offset read() throws ApiException {
try {
- Map<Partition, Offset> results = client.listPartitionCursors(subscription).get();
+ Map<Partition, Offset> results = client.listPartitionCursors(subscription).get(1, MINUTES);
return results.getOrDefault(partition, Offset.of(0));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
}
-
- @Override
- public void close() {
- client.close();
- }
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java
index 22e1623..22b1389 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java
@@ -96,11 +96,8 @@ class PerSubscriptionPartitionSdf extends DoFn<SubscriptionPartition, SequencedM
@GetInitialRestriction
public OffsetByteRange getInitialRestriction(
@Element SubscriptionPartition subscriptionPartition) {
- try (InitialOffsetReader reader = offsetReaderFactory.apply(subscriptionPartition)) {
- Offset offset = reader.read();
- return OffsetByteRange.of(
- new OffsetRange(offset.value(), Long.MAX_VALUE /* open interval */));
- }
+ Offset offset = offsetReaderFactory.apply(subscriptionPartition).read();
+ return OffsetByteRange.of(new OffsetRange(offset.value(), Long.MAX_VALUE /* open interval */));
}
@NewTracker
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherAssembler.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherAssembler.java
index bb8408d..91f0d02 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherAssembler.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherAssembler.java
@@ -17,18 +17,20 @@
*/
package org.apache.beam.sdk.io.gcp.pubsublite.internal;
+import static com.google.cloud.pubsublite.cloudpubsub.PublisherSettings.DEFAULT_BATCHING_SETTINGS;
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
-import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
+import static com.google.cloud.pubsublite.internal.wire.ServiceClients.getCallContext;
+import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Partition;
-import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
+import com.google.cloud.pubsublite.internal.wire.PartitionPublisherFactory;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
@@ -64,33 +66,50 @@ class PublisherAssembler {
}
}
- private PublisherServiceClient newServiceClient(Partition partition) {
- PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder();
- settingsBuilder =
- addDefaultMetadata(
- PubsubContext.of(FRAMEWORK),
- RoutingMetadata.of(options.topicPath(), partition),
- settingsBuilder);
+ private PublisherServiceClient newServiceClient() {
try {
return PublisherServiceClient.create(
- addDefaultSettings(options.topicPath().location().extractRegion(), settingsBuilder));
+ addDefaultSettings(
+ options.topicPath().location().extractRegion(),
+ PublisherServiceSettings.newBuilder()));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
}
- @SuppressWarnings("unchecked")
+ private PartitionPublisherFactory getPartitionPublisherFactory() throws ApiException {
+ PublisherServiceClient client = newServiceClient();
+ return new PartitionPublisherFactory() {
+ @Override
+ public com.google.cloud.pubsublite.internal.Publisher<MessageMetadata> newPublisher(
+ Partition partition) throws ApiException {
+ SinglePartitionPublisherBuilder.Builder singlePartitionBuilder =
+ SinglePartitionPublisherBuilder.newBuilder()
+ .setTopic(options.topicPath())
+ .setPartition(partition)
+ .setBatchingSettings(DEFAULT_BATCHING_SETTINGS)
+ .setStreamFactory(
+ responseStream -> {
+ ApiCallContext context =
+ getCallContext(
+ PubsubContext.of(FRAMEWORK),
+ RoutingMetadata.of(options.topicPath(), partition));
+ return client.publishCallable().splitCall(responseStream, context);
+ });
+ return singlePartitionBuilder.build();
+ }
+
+ @Override
+ public void close() {
+ client.close();
+ }
+ };
+ }
+
Publisher<MessageMetadata> newPublisher() throws ApiException {
return PartitionCountWatchingPublisherSettings.newBuilder()
.setTopic(options.topicPath())
- .setPublisherFactory(
- partition ->
- SinglePartitionPublisherBuilder.newBuilder()
- .setTopic(options.topicPath())
- .setPartition(partition)
- .setServiceClient(newServiceClient(partition))
- .setBatchingSettings(PublisherSettings.DEFAULT_BATCHING_SETTINGS)
- .build())
+ .setPublisherFactory(getPartitionPublisherFactory())
.setAdminClient(newAdminClient())
.build()
.instantiate();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
index edf4f51..4b572b6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
@@ -43,6 +43,8 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath;
import org.joda.time.Duration;
public class SubscribeTransform extends PTransform<PBegin, PCollection<SequencedMessage>> {
+ private static final long MEBIBYTE = 1L << 20;
+
private final SubscriberOptions options;
public SubscribeTransform(SubscriberOptions options) {
@@ -91,6 +93,13 @@ public class SubscribeTransform extends PTransform<PBegin, PCollection<Sequenced
return new SubscriberAssembler(options, subscriptionPartition.partition()).getBacklogReader();
}
+ private long calculateMinWindowBytes() {
+ long minFromFlowControl =
+ LongMath.saturatedMultiply(options.flowControlSettings().bytesOutstanding(), 10);
+ // Dataflow will not accept outputs larger than 1 GiB. Cap the maximum at 750 MiB to avoid this.
+ return Math.min(minFromFlowControl, 750 * MEBIBYTE);
+ }
+
private TrackerWithProgress newRestrictionTracker(
TopicBacklogReader backlogReader, OffsetByteRange initial) {
return new OffsetByteRangeTracker(
@@ -98,7 +107,7 @@ public class SubscribeTransform extends PTransform<PBegin, PCollection<Sequenced
backlogReader,
Stopwatch.createUnstarted(),
options.minBundleTimeout(),
- LongMath.saturatedMultiply(options.flowControlSettings().bytesOutstanding(), 10));
+ calculateMinWindowBytes());
}
private InitialOffsetReader newInitialOffsetReader(SubscriptionPartition subscriptionPartition) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriberAssembler.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriberAssembler.java
index da035be..7825ab8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriberAssembler.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriberAssembler.java
@@ -18,47 +18,66 @@
package org.apache.beam.sdk.io.gcp.pubsublite.internal;
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
-import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
+import static com.google.cloud.pubsublite.internal.wire.ServiceClients.getCallContext;
import static java.util.concurrent.TimeUnit.MINUTES;
+import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.AdminClient;
+import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CursorClient;
import com.google.cloud.pubsublite.internal.CursorClientSettings;
+import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
-import com.google.cloud.pubsublite.proto.CommitCursorRequest;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
-import com.google.cloud.pubsublite.v1.CursorServiceClient;
-import com.google.cloud.pubsublite.v1.CursorServiceSettings;
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
class SubscriberAssembler {
private static final Framework FRAMEWORK = Framework.of("BEAM");
+ private static final ConcurrentHashMap<SubscriptionPath, TopicPath> KNOWN_PATHS =
+ new ConcurrentHashMap<>();
+ private static final ConcurrentHashMap<SubscriptionPath, SubscriberServiceClient> SUB_CLIENTS =
+ new ConcurrentHashMap<>();
+ private static final ConcurrentHashMap<SubscriptionPath, CursorClient> CURSOR_CLIENTS =
+ new ConcurrentHashMap<>();
+
private final SubscriberOptions options;
private final Partition partition;
- SubscriberAssembler(SubscriberOptions options, Partition partition) {
- this.options = options;
- this.partition = partition;
+ private static TopicPath lookupTopicPath(SubscriptionPath subscriptionPath) {
+ try (AdminClient adminClient =
+ AdminClient.create(
+ AdminClientSettings.newBuilder()
+ .setRegion(subscriptionPath.location().extractRegion())
+ .build())) {
+ return TopicPath.parse(
+ adminClient.getSubscription(subscriptionPath).get(1, MINUTES).getTopic());
+ } catch (Throwable t) {
+ throw ExtractStatus.toCanonical(t).underlying;
+ }
+ }
+
+ private TopicPath getTopicPath() {
+ return KNOWN_PATHS.computeIfAbsent(
+ options.subscriptionPath(), SubscriberAssembler::lookupTopicPath);
}
private SubscriberServiceClient newSubscriberServiceClient() throws ApiException {
try {
SubscriberServiceSettings.Builder settingsBuilder = SubscriberServiceSettings.newBuilder();
- settingsBuilder =
- addDefaultMetadata(
- PubsubContext.of(FRAMEWORK),
- RoutingMetadata.of(options.subscriptionPath(), partition),
- settingsBuilder);
return SubscriberServiceClient.create(
addDefaultSettings(
options.subscriptionPath().location().extractRegion(), settingsBuilder));
@@ -67,13 +86,42 @@ class SubscriberAssembler {
}
}
+ private SubscriberServiceClient getSubscriberServiceClient() {
+ return SUB_CLIENTS.computeIfAbsent(
+ options.subscriptionPath(), path -> newSubscriberServiceClient());
+ }
+
+ private CursorClient newCursorClient() throws ApiException {
+ return CursorClient.create(
+ CursorClientSettings.newBuilder()
+ .setRegion(options.subscriptionPath().location().extractRegion())
+ .build());
+ }
+
+ private CursorClient getCursorClient() {
+ return CURSOR_CLIENTS.computeIfAbsent(options.subscriptionPath(), path -> newCursorClient());
+ }
+
+ SubscriberAssembler(SubscriberOptions options, Partition partition) {
+ this.options = options;
+ this.partition = partition;
+ }
+
SubscriberFactory getSubscriberFactory(Offset initialOffset) {
+ SubscriberServiceClient client = getSubscriberServiceClient();
return consumer ->
SubscriberBuilder.newBuilder()
.setMessageConsumer(consumer)
.setSubscriptionPath(options.subscriptionPath())
.setPartition(partition)
- .setServiceClient(newSubscriberServiceClient())
+ .setStreamFactory(
+ responseStream -> {
+ ApiCallContext context =
+ getCallContext(
+ PubsubContext.of(FRAMEWORK),
+ RoutingMetadata.of(options.subscriptionPath(), partition));
+ return client.subscribeCallable().splitCall(responseStream, context);
+ })
.setInitialLocation(
SeekRequest.newBuilder()
.setCursor(Cursor.newBuilder().setOffset(initialOffset.value()))
@@ -81,28 +129,11 @@ class SubscriberAssembler {
.build();
}
- private CursorServiceClient newCursorServiceClient() throws ApiException {
- try {
- return CursorServiceClient.create(
- addDefaultSettings(
- options.subscriptionPath().location().extractRegion(),
- CursorServiceSettings.newBuilder()));
- } catch (Throwable t) {
- throw toCanonical(t).underlying;
- }
- }
-
BlockingCommitter getCommitter() {
return offset -> {
try {
- newCursorServiceClient()
- .commitCursorCallable()
- .futureCall(
- CommitCursorRequest.newBuilder()
- .setSubscription(options.subscriptionPath().toString())
- .setPartition(partition.value())
- .setCursor(Cursor.newBuilder().setOffset(offset.value()))
- .build())
+ getCursorClient()
+ .commitCursor(options.subscriptionPath(), partition, offset)
.get(1, MINUTES);
} catch (Throwable t) {
throw toCanonical(t).underlying;
@@ -112,19 +143,13 @@ class SubscriberAssembler {
TopicBacklogReader getBacklogReader() {
return TopicBacklogReaderSettings.newBuilder()
- .setTopicPathFromSubscriptionPath(options.subscriptionPath())
+ .setTopicPath(getTopicPath())
.setPartition(partition)
.build()
.instantiate();
}
InitialOffsetReader getInitialOffsetReader() {
- return new InitialOffsetReaderImpl(
- CursorClient.create(
- CursorClientSettings.newBuilder()
- .setRegion(options.subscriptionPath().location().extractRegion())
- .build()),
- options.subscriptionPath(),
- partition);
+ return new InitialOffsetReaderImpl(getCursorClient(), options.subscriptionPath(), partition);
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
index 83ef45d..8b2eff9 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
@@ -55,7 +55,7 @@ class SubscriptionPartitionProcessorImpl extends Listener
private final OutputReceiver<SequencedMessage> receiver;
private final Subscriber subscriber;
private final SettableFuture<Void> completionFuture = SettableFuture.create();
- // Queue to transfer messages from subscriber callback to runFor downcall, since all
+ // Queue to transfer messages from subscriber callback to runFor downcall.
private final SynchronousQueue<List<SequencedMessage>> transfer = new SynchronousQueue<>();
private final FlowControlSettings flowControlSettings;
private Optional<Offset> lastClaimedOffset = Optional.empty();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReaderImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReaderImpl.java
index fd27081..96980b5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReaderImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReaderImpl.java
@@ -18,7 +18,7 @@
package org.apache.beam.sdk.io.gcp.pubsublite.internal;
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.TimeUnit.MINUTES;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Offset;
@@ -26,13 +26,8 @@ import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
-import java.util.concurrent.ExecutionException;
-import javax.annotation.Nonnull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
final class TopicBacklogReaderImpl implements TopicBacklogReader {
- private static final Logger LOG = LoggerFactory.getLogger(TopicBacklogReaderImpl.class);
private final TopicStatsClient client;
private final TopicPath topicPath;
private final Partition partition;
@@ -49,21 +44,14 @@ final class TopicBacklogReaderImpl implements TopicBacklogReader {
try {
return client
.computeMessageStats(topicPath, partition, offset, Offset.of(Integer.MAX_VALUE))
- .get();
- } catch (ExecutionException e) {
- @Nonnull Throwable cause = checkNotNull(e.getCause());
- throw toCanonical(cause).underlying;
- } catch (InterruptedException e) {
- throw toCanonical(e).underlying;
+ .get(1, MINUTES);
+ } catch (Throwable t) {
+ throw toCanonical(t).underlying;
}
}
@Override
public void close() {
- try {
- client.close();
- } catch (Exception e) {
- LOG.warn("Failed to close topic stats client.", e);
- }
+ client.close();
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReaderSettings.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReaderSettings.java
index a940bfd..45f9f40 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReaderSettings.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/TopicBacklogReaderSettings.java
@@ -17,32 +17,20 @@
*/
package org.apache.beam.sdk.io.gcp.pubsublite.internal;
-import static java.util.concurrent.TimeUnit.MINUTES;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
-
import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
-import com.google.cloud.pubsublite.AdminClient;
-import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.Partition;
-import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
-import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.internal.TopicStatsClientSettings;
import java.io.Serializable;
-import java.util.concurrent.ExecutionException;
-import javax.annotation.Nonnull;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker;
@AutoValue
abstract class TopicBacklogReaderSettings implements Serializable {
private static final long serialVersionUID = -4001752066450248673L;
- /**
- * The topic path for this backlog reader. Either topicPath or subscriptionPath must be set. If
- * both are set, subscriptionPath will be ignored.
- */
+ /** The topic path for this backlog reader. */
abstract TopicPath topicPath();
abstract Partition partition();
@@ -57,37 +45,20 @@ abstract class TopicBacklogReaderSettings implements Serializable {
// Required parameters.
abstract Builder setTopicPath(TopicPath topicPath);
- @SuppressWarnings("assignment.type.incompatible")
- Builder setTopicPathFromSubscriptionPath(SubscriptionPath subscriptionPath)
- throws ApiException {
- try (AdminClient adminClient =
- AdminClient.create(
- AdminClientSettings.newBuilder()
- .setRegion(subscriptionPath.location().extractRegion())
- .build())) {
- return setTopicPath(
- TopicPath.parse(
- adminClient.getSubscription(subscriptionPath).get(1, MINUTES).getTopic()));
- } catch (ExecutionException e) {
- @Nonnull Throwable cause = checkNotNull(e.getCause());
- throw ExtractStatus.toCanonical(cause).underlying;
- } catch (Throwable t) {
- throw ExtractStatus.toCanonical(t).underlying;
- }
- }
-
abstract Builder setPartition(Partition partition);
abstract TopicBacklogReaderSettings build();
}
- TopicBacklogReader instantiate() throws ApiException {
- TopicStatsClientSettings settings =
+ private TopicStatsClient newClient() {
+ return TopicStatsClient.create(
TopicStatsClientSettings.newBuilder()
.setRegion(topicPath().location().extractRegion())
- .build();
- TopicBacklogReader impl =
- new TopicBacklogReaderImpl(TopicStatsClient.create(settings), topicPath(), partition());
+ .build());
+ }
+
+ TopicBacklogReader instantiate() throws ApiException {
+ TopicBacklogReader impl = new TopicBacklogReaderImpl(newClient(), topicPath(), partition());
return new LimitingTopicBacklogReader(impl, Ticker.systemTicker());
}
}