You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/05/25 01:21:45 UTC

[beam] branch master updated: [BEAM-14129] Clean up PubsubLiteIO by removing options that no longer apply (#17169)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d50f1b1790a [BEAM-14129] Clean up PubsubLiteIO by removing options that no longer apply (#17169)
d50f1b1790a is described below

commit d50f1b1790aeb038bd9695b18c2e7274c3a6e64a
Author: dpcollins-google <40...@users.noreply.github.com>
AuthorDate: Tue May 24 21:21:37 2022 -0400

    [BEAM-14129] Clean up PubsubLiteIO by removing options that no longer apply (#17169)
    
    * [BEAM-14129] Clean up PubsubLiteIO by removing options that no longer apply
    
    Also remove explicit blocking on data delivery- yield to the runtime instead if no data is available since reading from Pub/Sub Lite is now performed in the background
    
    * [BEAM-14129] Clean up PubsubLiteIO by removing options that no longer apply
    
    Also remove explicit blocking on data delivery- yield to the runtime instead if no data is available since reading from Pub/Sub Lite is now performed in the background
    
    * [BEAM-14129] Clean up PubsubLiteIO by removing options that no longer apply
    
    Also remove explicit blocking on data delivery- yield to the runtime instead if no data is available since reading from Pub/Sub Lite is now performed in the background
    
    * [BEAM-14129] Perform all file writes in WriteFiles in the background
    
    This should allow bundles which write to multiple files to write to each of them in parallel, instead of blocking when any of their buffers fill up.
    
    * Fix python formatting issues
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
---
 .../sdk/io/gcp/pubsublite/SubscriberOptions.java   | 34 +------------
 .../internal/ExternalTransformConfig.java          |  5 --
 .../internal/MemoryBufferedSubscriber.java         |  7 ---
 .../internal/MemoryBufferedSubscriberImpl.java     | 15 ------
 .../internal/PerSubscriptionPartitionSdf.java      |  6 +--
 .../pubsublite/internal/SubscribeTransform.java    |  3 --
 .../internal/SubscriptionPartitionProcessor.java   |  3 +-
 .../SubscriptionPartitionProcessorImpl.java        | 57 ++++++----------------
 .../beam/sdk/io/gcp/pubsublite/ReadWriteIT.java    |  8 +--
 .../internal/MemoryBufferedSubscriberImplTest.java | 24 +--------
 .../internal/PerSubscriptionPartitionSdfTest.java  |  9 +---
 .../SubscriptionPartitionProcessorImplTest.java    | 33 ++-----------
 .../apache_beam/io/gcp/pubsublite/external.py      | 15 +-----
 .../apache_beam/io/gcp/pubsublite/proto_api.py     |  6 ---
 14 files changed, 31 insertions(+), 194 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java
index 303cff49e90..4689a19e5c6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java
@@ -19,45 +19,18 @@ package org.apache.beam.sdk.io.gcp.pubsublite;
 
 import com.google.auto.value.AutoValue;
 import com.google.cloud.pubsublite.SubscriptionPath;
-import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
 import com.google.errorprone.annotations.CanIgnoreReturnValue;
 import java.io.Serializable;
-import org.joda.time.Duration;
 
 @AutoValue
 public abstract class SubscriberOptions implements Serializable {
   private static final long serialVersionUID = 269598118L;
 
-  private static final long MEBIBYTE = 1L << 20;
-
-  private static final Duration MIN_BUNDLE_TIMEOUT = Duration.standardMinutes(1);
-
-  public static final FlowControlSettings DEFAULT_FLOW_CONTROL =
-      FlowControlSettings.builder()
-          .setMessagesOutstanding(Long.MAX_VALUE)
-          .setBytesOutstanding(100 * MEBIBYTE)
-          .build();
-
   // Required parameters.
   public abstract SubscriptionPath subscriptionPath();
 
-  // Optional parameters.
-  /** Per-partition flow control parameters for this subscription. */
-  public abstract FlowControlSettings flowControlSettings();
-
-  /**
-   * The minimum wall time to pass before allowing bundle closure.
-   *
-   * <p>Setting this to too small of a value will result in increased compute costs and lower
-   * throughput per byte. Immediate timeouts (Duration.ZERO) may be useful for testing.
-   */
-  public abstract Duration minBundleTimeout();
-
   public static Builder newBuilder() {
-    Builder builder = new AutoValue_SubscriberOptions.Builder();
-    return builder
-        .setFlowControlSettings(DEFAULT_FLOW_CONTROL)
-        .setMinBundleTimeout(MIN_BUNDLE_TIMEOUT);
+    return new AutoValue_SubscriberOptions.Builder();
   }
 
   public abstract Builder toBuilder();
@@ -68,11 +41,6 @@ public abstract class SubscriberOptions implements Serializable {
     // Required parameters.
     public abstract Builder setSubscriptionPath(SubscriptionPath path);
 
-    // Optional parameters
-    public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings);
-
-    public abstract Builder setMinBundleTimeout(Duration minBundleTimeout);
-
     public abstract SubscriberOptions build();
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/ExternalTransformConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/ExternalTransformConfig.java
index 4d4fcd0e34b..d0ba67ea055 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/ExternalTransformConfig.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/ExternalTransformConfig.java
@@ -30,7 +30,6 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
-import org.joda.time.Duration;
 
 class ExternalTransformConfig {
   private ExternalTransformConfig() {}
@@ -76,10 +75,6 @@ class ExternalTransformConfig {
       builder.setSubscriptionPath(SubscriptionPath.parse(path));
     }
 
-    public void setMinBundleTimeout(Long durationMillis) {
-      builder.setMinBundleTimeout(Duration.millis(durationMillis));
-    }
-
     public void setDeduplicate(Boolean deduplicate) {
       this.deduplicate = deduplicate;
     }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriber.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriber.java
index 27b6d83b94f..1454581703a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriber.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriber.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
-import com.google.api.core.ApiFuture;
 import com.google.api.core.ApiService;
 import com.google.api.gax.rpc.ApiException;
 import com.google.cloud.pubsublite.Offset;
@@ -46,10 +45,4 @@ interface MemoryBufferedSubscriber extends ApiService {
 
   /** Remove the head message from the buffer. Throws if no messages exist in the buffer. */
   void pop();
-
-  /**
-   * Return a future which will be satisfied when data is likely available or the subscriber has
-   * failed.
-   */
-  ApiFuture<Void> onData();
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriberImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriberImpl.java
index d8367970eb3..5a7cbbbcc2f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriberImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriberImpl.java
@@ -17,9 +17,6 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
-import com.google.api.core.ApiFuture;
-import com.google.api.core.ApiFutures;
-import com.google.api.core.SettableApiFuture;
 import com.google.api.gax.rpc.ApiException;
 import com.google.cloud.pubsublite.Offset;
 import com.google.cloud.pubsublite.Partition;
@@ -51,7 +48,6 @@ class MemoryBufferedSubscriberImpl extends ProxyService implements MemoryBuffere
   private long bytesOutstandingToServer = 0;
   private long bytesOutstanding = 0;
   private final Queue<SequencedMessage> messages = new ArrayDeque<>();
-  private SettableApiFuture<Void> newData = SettableApiFuture.create();
   private boolean shutdown = false;
 
   // onReceive will not be called inline as subscriber is not started.
@@ -88,7 +84,6 @@ class MemoryBufferedSubscriberImpl extends ProxyService implements MemoryBuffere
       return;
     }
     shutdown = true;
-    newData.set(null);
     memBlock.close();
   }
 
@@ -105,8 +100,6 @@ class MemoryBufferedSubscriberImpl extends ProxyService implements MemoryBuffere
       bytesOutstandingToServer -= message.getSizeBytes();
     }
     messages.addAll(batch);
-    newData.set(null);
-    newData = SettableApiFuture.create();
   }
 
   @Override
@@ -162,12 +155,4 @@ class MemoryBufferedSubscriberImpl extends ProxyService implements MemoryBuffere
     bytesOutstanding -= message.getSizeBytes();
     fetchOffset = Offset.of(message.getCursor().getOffset() + 1);
   }
-
-  @Override
-  public synchronized ApiFuture<Void> onData() {
-    if (shutdown || !messages.isEmpty()) {
-      return ApiFutures.immediateFuture(null);
-    }
-    return newData;
-  }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java
index 09964498d83..e15af09d7f1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java
@@ -26,14 +26,12 @@ import org.apache.beam.sdk.transforms.SerializableBiFunction;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
-import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 class PerSubscriptionPartitionSdf extends DoFn<SubscriptionPartition, SequencedMessage> {
   private static final Logger LOG = LoggerFactory.getLogger(PerSubscriptionPartitionSdf.class);
-  private final Duration maxSleepTime;
   private final ManagedBacklogReaderFactory backlogReaderFactory;
   private final SubscriptionPartitionProcessorFactory processorFactory;
   private final SerializableFunction<SubscriptionPartition, InitialOffsetReader>
@@ -43,14 +41,12 @@ class PerSubscriptionPartitionSdf extends DoFn<SubscriptionPartition, SequencedM
   private final SerializableFunction<SubscriptionPartition, BlockingCommitter> committerFactory;
 
   PerSubscriptionPartitionSdf(
-      Duration maxSleepTime,
       ManagedBacklogReaderFactory backlogReaderFactory,
       SerializableFunction<SubscriptionPartition, InitialOffsetReader> offsetReaderFactory,
       SerializableBiFunction<TopicBacklogReader, OffsetByteRange, TrackerWithProgress>
           trackerFactory,
       SubscriptionPartitionProcessorFactory processorFactory,
       SerializableFunction<SubscriptionPartition, BlockingCommitter> committerFactory) {
-    this.maxSleepTime = maxSleepTime;
     this.backlogReaderFactory = backlogReaderFactory;
     this.processorFactory = processorFactory;
     this.offsetReaderFactory = offsetReaderFactory;
@@ -89,7 +85,7 @@ class PerSubscriptionPartitionSdf extends DoFn<SubscriptionPartition, SequencedM
     LOG.debug("Starting process for {} at {}", subscriptionPartition, Instant.now());
     SubscriptionPartitionProcessor processor =
         processorFactory.newProcessor(subscriptionPartition, tracker, receiver);
-    ProcessContinuation result = processor.runFor(maxSleepTime);
+    ProcessContinuation result = processor.run();
     LOG.debug("Starting commit for {} at {}", subscriptionPartition, Instant.now());
     // TODO(dpcollins-google): Move commits to a bundle finalizer for drain correctness
     processor
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
index 97c2899545e..4ecdf7a3c3b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
@@ -39,7 +39,6 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.joda.time.Duration;
 
 public class SubscribeTransform extends PTransform<PBegin, PCollection<SequencedMessage>> {
   private static final long MEBIBYTE = 1L << 20;
@@ -144,8 +143,6 @@ public class SubscribeTransform extends PTransform<PBegin, PCollection<Sequenced
     return subscriptionPartitions.apply(
         ParDo.of(
             new PerSubscriptionPartitionSdf(
-                // Ensure we read for at least 5 seconds more than the bundle timeout.
-                options.minBundleTimeout().plus(Duration.standardSeconds(5)),
                 new ManagedBacklogReaderFactoryImpl(this::newBacklogReader),
                 this::newInitialOffsetReader,
                 this::newRestrictionTracker,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessor.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessor.java
index 96f9cd7a491..768f6284c7d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessor.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessor.java
@@ -20,10 +20,9 @@ package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 import com.google.cloud.pubsublite.Offset;
 import java.util.Optional;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
-import org.joda.time.Duration;
 
 interface SubscriptionPartitionProcessor {
-  ProcessContinuation runFor(Duration duration);
+  ProcessContinuation run();
 
   Optional<Offset> lastClaimed();
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
index 818ccf7f7fa..0e8ad9fb7d7 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
@@ -17,21 +17,14 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
-import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
-
 import com.google.cloud.pubsublite.Offset;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
 import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,42 +52,24 @@ class SubscriptionPartitionProcessorImpl implements SubscriptionPartitionProcess
   }
 
   @Override
-  @SuppressWarnings("argument.type.incompatible")
-  public ProcessContinuation runFor(Duration duration) {
-    Instant maxReadTime = Instant.now().plus(duration);
-    while (subscriber.isRunning()) {
-      // Read any available data.
-      for (Optional<SequencedMessage> next = subscriber.peek();
-          next.isPresent();
-          next = subscriber.peek()) {
-        SequencedMessage message = next.get();
-        Offset messageOffset = Offset.of(message.getCursor().getOffset());
-        if (tracker.tryClaim(OffsetByteProgress.of(messageOffset, message.getSizeBytes()))) {
-          subscriber.pop();
-          lastClaimedOffset = Optional.of(messageOffset);
-          receiver.outputWithTimestamp(
-              message, new Instant(Timestamps.toMillis(message.getPublishTime())));
-        } else {
-          // Our claim failed, return stop()
-          return ProcessContinuation.stop();
-        }
-      }
-      // Try waiting for new data.
-      try {
-        Duration readTime = new Duration(Instant.now(), maxReadTime);
-        Future<Void> onData = subscriber.onData();
-        checkArgumentNotNull(onData);
-        onData.get(readTime.getMillis(), TimeUnit.MILLISECONDS);
-      } catch (TimeoutException e) {
-        // Read timed out without us being cut off, yield to the runtime.
-        return ProcessContinuation.resume();
-      } catch (InterruptedException | ExecutionException e2) {
-        // We should never be interrupted by beam, and onData should never return an error.
-        throw new RuntimeException(e2);
+  public ProcessContinuation run() {
+    // Read any available data.
+    for (Optional<SequencedMessage> next = subscriber.peek();
+        next.isPresent();
+        next = subscriber.peek()) {
+      SequencedMessage message = next.get();
+      Offset messageOffset = Offset.of(message.getCursor().getOffset());
+      if (tracker.tryClaim(OffsetByteProgress.of(messageOffset, message.getSizeBytes()))) {
+        subscriber.pop();
+        lastClaimedOffset = Optional.of(messageOffset);
+        receiver.outputWithTimestamp(
+            message, new Instant(Timestamps.toMillis(message.getPublishTime())));
+      } else {
+        // Our claim failed, return stop()
+        return ProcessContinuation.stop();
       }
     }
-    // Subscriber is no longer running, it has likely failed. Yield to the runtime to retry reading
-    // with a new subscriber.
+    // There is no more data available, yield to the runtime.
     return ProcessContinuation.resume();
   }
 
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
index c02bd97241a..729c51cf937 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java
@@ -194,13 +194,7 @@ public class ReadWriteIT {
         pipeline.apply(
             "readMessages",
             PubsubLiteIO.read(
-                SubscriberOptions.newBuilder()
-                    .setSubscriptionPath(subscriptionPath)
-                    // setMinBundleTimeout INTENDED FOR TESTING ONLY
-                    // This sacrifices efficiency to make tests run faster. Do not use this in a
-                    // real pipeline!
-                    .setMinBundleTimeout(Duration.standardSeconds(5))
-                    .build()));
+                SubscriberOptions.newBuilder().setSubscriptionPath(subscriptionPath).build()));
     return messages;
     // TODO(BEAM-13230): Fix and re-enable
     // Deduplicate messages based on the uuids added in PubsubLiteIO.addUuids() when writing.
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriberImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriberImplTest.java
index 31eae3ca550..456b5b4bd56 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriberImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriberImplTest.java
@@ -39,7 +39,6 @@ import com.google.cloud.pubsublite.internal.wire.Subscriber;
 import com.google.cloud.pubsublite.proto.Cursor;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import java.util.List;
-import java.util.concurrent.Future;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -157,21 +156,7 @@ public class MemoryBufferedSubscriberImplTest {
   }
 
   @Test
-  public void onDataSatisfiedOnShutdown() throws Exception {
-    Future<Void> onData = bufferedSubscriber.onData();
-    bufferedSubscriber.stopAsync();
-    onData.get();
-  }
-
-  @Test
-  public void onDataSatisfiedOnError() throws Exception {
-    Future<Void> onData = bufferedSubscriber.onData();
-    subscriber.fail(new RuntimeException("bad"));
-    onData.get();
-  }
-
-  @Test
-  public void onDataSatisfiedOnData() throws Exception {
+  public void dataAvailableToCaller() {
     SequencedMessage message1 =
         SequencedMessage.newBuilder()
             .setCursor(Cursor.newBuilder().setOffset(example(Offset.class).value() + 10))
@@ -182,19 +167,14 @@ public class MemoryBufferedSubscriberImplTest {
             .setCursor(Cursor.newBuilder().setOffset(example(Offset.class).value() + 20))
             .setSizeBytes(1)
             .build();
-    Future<Void> onData = bufferedSubscriber.onData();
-    assertFalse(onData.isDone());
+    assertFalse(bufferedSubscriber.peek().isPresent());
     consumer.accept(ImmutableList.of(message1, message2));
-    onData.get();
-    assertTrue(bufferedSubscriber.onData().isDone()); // Still messages, onData is satisfied.
     assertEquals(bufferedSubscriber.fetchOffset(), example(Offset.class));
     assertEquals(bufferedSubscriber.peek().get(), message1);
     bufferedSubscriber.pop();
     assertEquals(bufferedSubscriber.fetchOffset(), Offset.of(message1.getCursor().getOffset() + 1));
-    assertTrue(bufferedSubscriber.onData().isDone()); // Still messages, onData is satisfied.
     assertEquals(bufferedSubscriber.peek().get(), message2);
     bufferedSubscriber.pop();
     assertEquals(bufferedSubscriber.fetchOffset(), Offset.of(message2.getCursor().getOffset() + 1));
-    assertFalse(bufferedSubscriber.onData().isDone());
   }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdfTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdfTest.java
index 676de2fc11d..9aca9724c97 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdfTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdfTest.java
@@ -51,7 +51,6 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath;
-import org.joda.time.Duration;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -63,8 +62,6 @@ import org.mockito.Spy;
 @RunWith(JUnit4.class)
 @SuppressWarnings("initialization.fields.uninitialized")
 public class PerSubscriptionPartitionSdfTest {
-  private static final Duration MAX_SLEEP_TIME =
-      Duration.standardMinutes(10).plus(Duration.millis(10));
   private static final OffsetByteRange RESTRICTION =
       OffsetByteRange.of(new OffsetRange(1, Long.MAX_VALUE), 0);
   private static final SubscriptionPartition PARTITION =
@@ -100,7 +97,6 @@ public class PerSubscriptionPartitionSdfTest {
     when(backlogReaderFactory.newReader(any())).thenReturn(backlogReader);
     sdf =
         new PerSubscriptionPartitionSdf(
-            MAX_SLEEP_TIME,
             backlogReaderFactory,
             offsetReaderFactory,
             trackerFactory,
@@ -139,7 +135,7 @@ public class PerSubscriptionPartitionSdfTest {
   @Test
   @SuppressWarnings("argument.type.incompatible")
   public void process() throws Exception {
-    when(processor.runFor(MAX_SLEEP_TIME)).thenReturn(ProcessContinuation.resume());
+    when(processor.run()).thenReturn(ProcessContinuation.resume());
     when(processorFactory.newProcessor(any(), any(), any()))
         .thenAnswer(
             args -> {
@@ -154,7 +150,7 @@ public class PerSubscriptionPartitionSdfTest {
     assertEquals(ProcessContinuation.resume(), sdf.processElement(tracker, PARTITION, output));
     verify(processorFactory).newProcessor(eq(PARTITION), any(), eq(output));
     InOrder order = inOrder(processor);
-    order.verify(processor).runFor(MAX_SLEEP_TIME);
+    order.verify(processor).run();
     order.verify(processor).lastClaimed();
     InOrder order2 = inOrder(committerFactory, committer);
     order2.verify(committerFactory).apply(PARTITION);
@@ -178,7 +174,6 @@ public class PerSubscriptionPartitionSdfTest {
     ObjectOutputStream output = new ObjectOutputStream(new ByteArrayOutputStream());
     output.writeObject(
         new PerSubscriptionPartitionSdf(
-            MAX_SLEEP_TIME,
             new NoopManagedBacklogReaderFactory(),
             x -> null,
             (x, y) -> null,
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImplTest.java
index 679861b0372..c07682bb238 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImplTest.java
@@ -31,8 +31,6 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.MockitoAnnotations.initMocks;
 
-import com.google.api.core.ApiFutures;
-import com.google.api.core.SettableApiFuture;
 import com.google.api.gax.rpc.ApiException;
 import com.google.api.gax.rpc.StatusCode.Code;
 import com.google.cloud.pubsublite.Offset;
@@ -49,7 +47,6 @@ import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;
@@ -96,7 +93,6 @@ public class SubscriptionPartitionProcessorImplTest {
     when(tracker.currentRestriction()).thenReturn(initialRange());
     doReturn(true).when(subscriber).isRunning();
     doReturn(example(Offset.class)).when(subscriber).fetchOffset();
-    doReturn(SettableApiFuture.create()).when(subscriber).onData();
   }
 
   private SubscriptionPartitionProcessor newProcessor() {
@@ -106,7 +102,7 @@ public class SubscriptionPartitionProcessorImplTest {
   @Test
   public void lifecycle() {
     SubscriptionPartitionProcessor processor = newProcessor();
-    assertEquals(ProcessContinuation.resume(), processor.runFor(Duration.millis(10)));
+    assertEquals(ProcessContinuation.resume(), processor.run());
     InOrder order = inOrder(subscriberFactory, subscriber);
     order.verify(subscriberFactory).get();
     order.verify(subscriber).fetchOffset();
@@ -120,7 +116,7 @@ public class SubscriptionPartitionProcessorImplTest {
     doThrow(new RuntimeException("Ignored")).when(badSubscriber).awaitTerminated();
     doReturn(badSubscriber, subscriber).when(subscriberFactory).get();
     SubscriptionPartitionProcessor processor = newProcessor();
-    assertEquals(ProcessContinuation.resume(), processor.runFor(Duration.millis(10)));
+    assertEquals(ProcessContinuation.resume(), processor.run());
     InOrder order = inOrder(subscriberFactory, badSubscriber, subscriber);
     order.verify(subscriberFactory).get();
     order.verify(badSubscriber).fetchOffset();
@@ -137,30 +133,14 @@ public class SubscriptionPartitionProcessorImplTest {
     assertThrows(ApiException.class, this::newProcessor);
   }
 
-  @Test
-  public void subscriberFailureReturnsResume() throws Exception {
-    SubscriptionPartitionProcessor processor = newProcessor();
-    doReturn(ApiFutures.immediateFuture(null)).when(subscriber).onData();
-    doReturn(false).when(subscriber).isRunning();
-    assertEquals(ProcessContinuation.resume(), processor.runFor(Duration.standardHours(1)));
-  }
-
-  @Test
-  public void timeoutReturnsResume() {
-    SubscriptionPartitionProcessor processor = newProcessor();
-    assertEquals(ProcessContinuation.resume(), processor.runFor(Duration.millis(10)));
-    assertFalse(processor.lastClaimed().isPresent());
-  }
-
   @Test
   public void failedClaimCausesStop() {
     SubscriptionPartitionProcessor processor = newProcessor();
 
     when(tracker.tryClaim(any())).thenReturn(false);
-    doReturn(ApiFutures.immediateFuture(null)).when(subscriber).onData();
     doReturn(Optional.of(messageWithOffset(1))).when(subscriber).peek();
 
-    assertEquals(ProcessContinuation.stop(), processor.runFor(Duration.standardHours(10)));
+    assertEquals(ProcessContinuation.stop(), processor.run());
 
     verify(tracker, times(1)).tryClaim(any());
     verify(subscriber, times(0)).pop();
@@ -168,11 +148,8 @@ public class SubscriptionPartitionProcessorImplTest {
   }
 
   @Test
-  public void successfulClaimThenTimeout() {
+  public void successfulClaimsThenNoMoreMessagesFromSubscriber() {
     doReturn(true).when(tracker).tryClaim(any());
-    doReturn(ApiFutures.immediateFuture(null), SettableApiFuture.create())
-        .when(subscriber)
-        .onData();
 
     SequencedMessage message1 = messageWithOffset(1);
     SequencedMessage message3 = messageWithOffset(3);
@@ -181,7 +158,7 @@ public class SubscriptionPartitionProcessorImplTest {
         .peek();
 
     SubscriptionPartitionProcessor processor = newProcessor();
-    assertEquals(ProcessContinuation.resume(), processor.runFor(Duration.standardSeconds(3)));
+    assertEquals(ProcessContinuation.resume(), processor.run());
 
     InOrder order = inOrder(tracker, receiver);
     order.verify(tracker).tryClaim(OffsetByteProgress.of(Offset.of(1), message1.getSizeBytes()));
diff --git a/sdks/python/apache_beam/io/gcp/pubsublite/external.py b/sdks/python/apache_beam/io/gcp/pubsublite/external.py
index d5a71cfb9dc..a0e46c1b4d8 100644
--- a/sdks/python/apache_beam/io/gcp/pubsublite/external.py
+++ b/sdks/python/apache_beam/io/gcp/pubsublite/external.py
@@ -29,9 +29,7 @@ from apache_beam.transforms.external import ExternalTransform
 from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
 
 _ReadSchema = typing.NamedTuple(
-    '_ReadSchema',
-    [('subscription_path', str), ('min_bundle_timeout', int),
-     ('deduplicate', bool)])
+    '_ReadSchema', [('subscription_path', str), ('deduplicate', bool)])
 
 
 def _default_io_expansion_service():
@@ -51,7 +49,6 @@ class _ReadExternal(ExternalTransform):
   def __init__(
       self,
       subscription_path,
-      min_bundle_timeout=None,
       deduplicate=None,
       expansion_service=None,
   ):
@@ -61,15 +58,9 @@ class _ReadExternal(ExternalTransform):
 
     Args:
       subscription_path: A Pub/Sub Lite Subscription path.
-      min_bundle_timeout: The minimum wall time to pass before allowing
-          bundle closure. Setting this to too small of a value will result in
-          increased compute costs and lower throughput per byte. Immediate
-          timeouts (0) may be useful for testing.
       deduplicate: Whether to deduplicate messages based on the value of
           the 'x-goog-pubsublite-dataflow-uuid' attribute.
     """
-    if min_bundle_timeout is None:
-      min_bundle_timeout = 60 * 1000
     if deduplicate is None:
       deduplicate = False
     if expansion_service is None:
@@ -78,9 +69,7 @@ class _ReadExternal(ExternalTransform):
         'beam:transform:org.apache.beam:pubsublite_read:v1',
         NamedTupleBasedPayloadBuilder(
             _ReadSchema(
-                subscription_path=subscription_path,
-                min_bundle_timeout=min_bundle_timeout,
-                deduplicate=deduplicate)),
+                subscription_path=subscription_path, deduplicate=deduplicate)),
         expansion_service)
 
 
diff --git a/sdks/python/apache_beam/io/gcp/pubsublite/proto_api.py b/sdks/python/apache_beam/io/gcp/pubsublite/proto_api.py
index 43cb1c6604d..a8e3defc4f9 100644
--- a/sdks/python/apache_beam/io/gcp/pubsublite/proto_api.py
+++ b/sdks/python/apache_beam/io/gcp/pubsublite/proto_api.py
@@ -37,7 +37,6 @@ class ReadFromPubSubLite(PTransform):
   def __init__(
       self,
       subscription_path,
-      min_bundle_timeout=None,
       deduplicate=None,
       expansion_service=None,
   ):
@@ -46,17 +45,12 @@ class ReadFromPubSubLite(PTransform):
     Args:
       subscription_path: Pub/Sub Lite Subscription in the form
           projects/<project>/locations/<location>/subscriptions/<subscription>
-      min_bundle_timeout: The minimum wall time to pass before allowing
-          bundle closure. Setting this to too small of a value will result in
-          increased compute costs and lower throughput per byte. Immediate
-          timeouts (0) may be useful for testing.
       deduplicate: Whether to deduplicate messages based on the value of
           the 'x-goog-pubsublite-dataflow-uuid' attribute. Defaults to False.
     """
     super().__init__()
     self._source = _ReadExternal(
         subscription_path=subscription_path,
-        min_bundle_timeout=min_bundle_timeout,
         deduplicate=deduplicate,
         expansion_service=expansion_service,
     )