You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/21 17:23:14 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #17125: [BEAM-14129] Restructure PubsubLiteIO Read side to produce smaller bundles

TheNeuralBit commented on a change in pull request #17125:
URL: https://github.com/apache/beam/pull/17125#discussion_r831358967



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
##########
@@ -17,161 +17,111 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
-import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
-import com.google.api.core.ApiService.Listener;
-import com.google.api.core.ApiService.State;
 import com.google.cloud.pubsublite.Offset;
-import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
-import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.ExtractStatus;
-import com.google.cloud.pubsublite.internal.wire.Subscriber;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
-import com.google.cloud.pubsublite.proto.FlowControlRequest;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
-import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import javax.annotation.Nullable;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class SubscriptionPartitionProcessorImpl extends Listener
-    implements SubscriptionPartitionProcessor, AutoCloseable {
+class SubscriptionPartitionProcessorImpl implements SubscriptionPartitionProcessor {
   private static final Logger LOG =
       LoggerFactory.getLogger(SubscriptionPartitionProcessorImpl.class);
+  private final SubscriptionPartition subscriptionPartition;
   private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;
   private final OutputReceiver<SequencedMessage> receiver;
-  private final Subscriber subscriber;
-  private final SettableFuture<Void> completionFuture = SettableFuture.create();
-  // Queue to transfer messages from subscriber callback to runFor downcall.
-  private final SynchronousQueue<List<SequencedMessage>> transfer = new SynchronousQueue<>();
-  private final FlowControlSettings flowControlSettings;
+  private final MemoryBufferedSubscriber subscriber;
   private Optional<Offset> lastClaimedOffset = Optional.empty();
 
+  // getReadySubscriber doesn't reference the subscriber member.
   @SuppressWarnings("methodref.receiver.bound.invalid")
   SubscriptionPartitionProcessorImpl(
+      SubscriptionPartition subscriptionPartition,
       RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
       OutputReceiver<SequencedMessage> receiver,
-      Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory,
-      FlowControlSettings flowControlSettings) {
+      Supplier<MemoryBufferedSubscriber> subscriberFactory) {
+    this.subscriptionPartition = subscriptionPartition;
     this.tracker = tracker;
     this.receiver = receiver;
-    this.subscriber = subscriberFactory.apply(this::onSubscriberMessages);
-    this.flowControlSettings = flowControlSettings;
-  }
-
-  @Override
-  public void failed(State from, Throwable failure) {
-    completionFuture.setException(ExtractStatus.toCanonical(failure));
-  }
-
-  private void onSubscriberMessages(List<SequencedMessage> messages) {
-    try {
-      while (!completionFuture.isDone()) {
-        if (transfer.offer(messages, 10, TimeUnit.MILLISECONDS)) {
-          return;
-        }
-      }
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
-  @SuppressWarnings("argument.type.incompatible")
-  private void start() {
-    this.subscriber.addListener(this, SystemExecutors.getFuturesExecutor());
-    this.subscriber.startAsync();
-    this.subscriber.awaitRunning();
-    try {
-      this.subscriber.allowFlow(
-          FlowControlRequest.newBuilder()
-              .setAllowedBytes(flowControlSettings.bytesOutstanding())
-              .setAllowedMessages(flowControlSettings.messagesOutstanding())
-              .build());
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
-  private void handleMessages(List<SequencedMessage> messages) {
-    if (completionFuture.isDone()) {
-      return;
-    }
-    Offset lastOffset = Offset.of(Iterables.getLast(messages).getCursor().getOffset());
-    long byteSize = messages.stream().mapToLong(SequencedMessage::getSizeBytes).sum();
-    if (tracker.tryClaim(OffsetByteProgress.of(lastOffset, byteSize))) {
-      lastClaimedOffset = Optional.of(lastOffset);
-      messages.forEach(
-          message ->
-              receiver.outputWithTimestamp(
-                  message, new Instant(Timestamps.toMillis(message.getPublishTime()))));
-      try {
-        subscriber.allowFlow(
-            FlowControlRequest.newBuilder()
-                .setAllowedBytes(byteSize)
-                .setAllowedMessages(messages.size())
-                .build());
-      } catch (CheckedApiException e) {
-        completionFuture.setException(e);
-      }
-    } else {
-      completionFuture.set(null);
-    }
+    this.subscriber = getReadySubscriber(subscriberFactory);

Review comment:
       The suppression doesn't seem to work in PreCommit:
   
   ```
   08:31:50 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_SQL_Commit/src/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java:58: error: [method.invocation.invalid] call to getReadySubscriber(java.util.function.Supplier<org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryBufferedSubscriber>) not allowed on the given receiver.
   08:31:50     this.subscriber = getReadySubscriber(subscriberFactory);
   08:31:50                                         ^
   08:31:50   found   : @UnderInitialization @NonNull SubscriptionPartitionProcessorImpl
   08:31:50   required: @Initialized @NonNull SubscriptionPartitionProcessorImpl
   ```
   
   You might repro locally by running with checker framework enabled: https://cwiki.apache.org/confluence/display/BEAM/Java+Tips#JavaTips-WhatcommandshouldIrunlocallybeforecreatingapullrequest?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org