You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/21 10:09:52 UTC

[GitHub] [beam] scwhittle commented on a change in pull request #17125: [BEAM-14129] Restructure PubsubLiteIO Read side to produce smaller bundles

scwhittle commented on a change in pull request #17125:
URL: https://github.com/apache/beam/pull/17125#discussion_r830897695



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriberImpl.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite.internal;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.internal.CheckedApiException;
+import com.google.cloud.pubsublite.internal.ProxyService;
+import com.google.cloud.pubsublite.internal.wire.Subscriber;
+import com.google.cloud.pubsublite.proto.FlowControlRequest;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryLimiter.Block;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryBufferedSubscriberImpl extends ProxyService implements MemoryBufferedSubscriber {
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryBufferedSubscriberImpl.class);
+
+  private final Partition partition;
+  private final MemoryLimiter limiter;
+  private final Subscriber subscriber;
+  private final long maxMemory;
+  private long targetMemory;
+  private Offset fetchOffset;
+  private Block memBlock;
+
+  private long bytesOutstandingToServer = 0;
+  private long bytesOutstanding = 0;
+  private final Queue<SequencedMessage> messages = new ArrayDeque<>();
+  private SettableApiFuture<Void> newData = SettableApiFuture.create();
+  private boolean shutdown = false;
+
+  @SuppressWarnings({"methodref.receiver.bound.invalid", "method.invocation.invalid"})

Review comment:
       why are these safe to ignore? Add a comment

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/OffsetByteRangeTracker.java
##########
@@ -76,97 +57,47 @@ public IsBounded isBounded() {
 
   @Override
   public boolean tryClaim(OffsetByteProgress position) {
-    long toClaim = position.lastOffset().value();
-    checkArgument(
-        lastClaimed == null || toClaim > lastClaimed,
-        "Trying to claim offset %s while last attempted was %s",
-        position.lastOffset().value(),
-        lastClaimed);
-    checkArgument(
-        toClaim >= range.getRange().getFrom(),
-        "Trying to claim offset %s before start of the range %s",
-        toClaim,
-        range);
-    // split() has already been called, truncating this range. No more offsets may be claimed.
-    if (range.getRange().getTo() != Long.MAX_VALUE) {
-      boolean isRangeEmpty = range.getRange().getTo() == range.getRange().getFrom();
-      boolean isValidClosedRange = nextOffset() == range.getRange().getTo();
-      checkState(
-          isRangeEmpty || isValidClosedRange,
-          "Violated class precondition: offset range improperly split. Please report a beam bug.");
-      return false;
-    }
-    lastClaimed = toClaim;
-    range = OffsetByteRange.of(range.getRange(), range.getByteCount() + position.batchBytes());
+    if (!rangeTracker.tryClaim(position.lastOffset().value())) return false;
+    lastClaimed = position.lastOffset().value();
+    bytes += position.batchBytes();
     return true;
   }
 
   @Override
   public OffsetByteRange currentRestriction() {
-    return range;
+    return OffsetByteRange.of(rangeTracker.currentRestriction(), bytes);
   }
 
   private long nextOffset() {
     checkState(lastClaimed == null || lastClaimed < Long.MAX_VALUE);
     return lastClaimed == null ? currentRestriction().getRange().getFrom() : lastClaimed + 1;
   }
 
-  /**
-   * Whether the tracker has received enough data/been running for enough time that it can
-   * checkpoint and be confident it can get sufficient throughput.
-   */
-  private boolean receivedEnough() {
-    Duration duration = Duration.millis(stopwatch.elapsed(TimeUnit.MILLISECONDS));
-    if (duration.isLongerThan(minTrackingTime)) {
-      return true;
-    }
-    if (currentRestriction().getByteCount() >= minBytesReceived) {
-      return true;
-    }
-    return false;
-  }
-
   @Override
   public @Nullable SplitResult<OffsetByteRange> trySplit(double fractionOfRemainder) {
     // Cannot split a bounded range. This should already be completely claimed.
-    if (range.getRange().getTo() != Long.MAX_VALUE) {
+    if (rangeTracker.currentRestriction().getTo() != Long.MAX_VALUE) {
       return null;
     }
-    if (!receivedEnough()) {
+    @Nullable SplitResult<OffsetRange> ranges = rangeTracker.trySplit(fractionOfRemainder);
+    if (ranges == null) {
       return null;
     }
-    range =
-        OffsetByteRange.of(
-            new OffsetRange(currentRestriction().getRange().getFrom(), nextOffset()),
-            range.getByteCount());
+    checkArgument(rangeTracker.currentRestriction().equals(ranges.getPrimary()));
     return SplitResult.of(
-        this.range, OffsetByteRange.of(new OffsetRange(nextOffset(), Long.MAX_VALUE), 0));
+        currentRestriction(), OffsetByteRange.of(checkArgumentNotNull(ranges.getResidual())));
   }
 
   @Override
   @SuppressWarnings("unboxing.of.nullable")

Review comment:
       remove suppresion

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryLimiterImpl.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite.internal;
+
+import javax.annotation.concurrent.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryLimiterImpl implements MemoryLimiter {
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryLimiterImpl.class);
+  private final long minBlockSize;
+
+  @GuardedBy("this")
+  private long available;
+
+  public MemoryLimiterImpl(long minBlockSize, long maxAvailable) {
+    this.minBlockSize = minBlockSize;
+    this.available = maxAvailable;
+  }
+
+  @Override
+  public synchronized Block claim(long toAcquire) {
+    toAcquire = Math.max(Math.min(toAcquire, available / 2), minBlockSize);
+    available -= toAcquire;
+    return new Block(toAcquire);
+  }
+
+  @Override
+  public long getMinBlockSize() {
+    return minBlockSize;
+  }
+
+  private synchronized void release(long toRelease) {
+    available += toRelease;

Review comment:
       assert the available is less than the max after adding toRelease?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java
##########
@@ -61,8 +64,8 @@ public void teardown() {
   }
 
   @GetInitialWatermarkEstimatorState
-  public Instant getInitialWatermarkState() {
-    return Instant.EPOCH;
+  public Instant getInitialWatermarkState(@Timestamp Instant inputTs) {

Review comment:
       add a comment on why this is necessary since this was tricky to get right

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
##########
@@ -38,12 +39,16 @@
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath;
 import org.joda.time.Duration;
 
 public class SubscribeTransform extends PTransform<PBegin, PCollection<SequencedMessage>> {
   private static final long MEBIBYTE = 1L << 20;
+  private static final long SOFT_MEMORY_LIMIT = 512 * MEBIBYTE;

Review comment:
       could consider making these pipeline options

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryLimiterImpl.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite.internal;
+
+import javax.annotation.concurrent.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryLimiterImpl implements MemoryLimiter {
+  private static final Logger LOG = LoggerFactory.getLogger(MemoryLimiterImpl.class);
+  private final long minBlockSize;
+
+  @GuardedBy("this")
+  private long available;
+
+  public MemoryLimiterImpl(long minBlockSize, long maxAvailable) {
+    this.minBlockSize = minBlockSize;
+    this.available = maxAvailable;
+  }
+
+  @Override
+  public synchronized Block claim(long toAcquire) {
+    toAcquire = Math.max(Math.min(toAcquire, available / 2), minBlockSize);
+    available -= toAcquire;
+    return new Block(toAcquire);
+  }
+
+  @Override
+  public long getMinBlockSize() {
+    return minBlockSize;
+  }
+
+  private synchronized void release(long toRelease) {
+    available += toRelease;
+  }
+
+  public class Block implements MemoryLimiter.Block {
+    public final long claimed;
+    private boolean released = false;
+
+    private Block(long claimed) {
+      this.claimed = claimed;
+    }
+
+    @Override
+    public long claimed() {
+      return claimed;
+    }
+
+    @Override
+    public void close() {
+      release(claimed);

Review comment:
       assert not released already?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
##########
@@ -17,161 +17,109 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
-import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
-import com.google.api.core.ApiService.Listener;
-import com.google.api.core.ApiService.State;
 import com.google.cloud.pubsublite.Offset;
-import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
-import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.ExtractStatus;
-import com.google.cloud.pubsublite.internal.wire.Subscriber;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
-import com.google.cloud.pubsublite.proto.FlowControlRequest;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
-import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import javax.annotation.Nullable;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class SubscriptionPartitionProcessorImpl extends Listener
-    implements SubscriptionPartitionProcessor, AutoCloseable {
+class SubscriptionPartitionProcessorImpl implements SubscriptionPartitionProcessor {
   private static final Logger LOG =
       LoggerFactory.getLogger(SubscriptionPartitionProcessorImpl.class);
+  private final SubscriptionPartition subscriptionPartition;
   private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;
   private final OutputReceiver<SequencedMessage> receiver;
-  private final Subscriber subscriber;
-  private final SettableFuture<Void> completionFuture = SettableFuture.create();
-  // Queue to transfer messages from subscriber callback to runFor downcall.
-  private final SynchronousQueue<List<SequencedMessage>> transfer = new SynchronousQueue<>();
-  private final FlowControlSettings flowControlSettings;
+  private final MemoryBufferedSubscriber subscriber;
   private Optional<Offset> lastClaimedOffset = Optional.empty();
 
   @SuppressWarnings("methodref.receiver.bound.invalid")
   SubscriptionPartitionProcessorImpl(
+      SubscriptionPartition subscriptionPartition,
       RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
       OutputReceiver<SequencedMessage> receiver,
-      Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory,
-      FlowControlSettings flowControlSettings) {
+      Supplier<MemoryBufferedSubscriber> subscriberFactory) {
+    this.subscriptionPartition = subscriptionPartition;
     this.tracker = tracker;
     this.receiver = receiver;
-    this.subscriber = subscriberFactory.apply(this::onSubscriberMessages);
-    this.flowControlSettings = flowControlSettings;
+    this.subscriber = getReadySubscriber(subscriberFactory);
   }
 
   @Override
-  public void failed(State from, Throwable failure) {
-    completionFuture.setException(ExtractStatus.toCanonical(failure));
-  }
-
-  private void onSubscriberMessages(List<SequencedMessage> messages) {
-    try {
-      while (!completionFuture.isDone()) {
-        if (transfer.offer(messages, 10, TimeUnit.MILLISECONDS)) {
-          return;
-        }
-      }
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
   @SuppressWarnings("argument.type.incompatible")
-  private void start() {
-    this.subscriber.addListener(this, SystemExecutors.getFuturesExecutor());
-    this.subscriber.startAsync();
-    this.subscriber.awaitRunning();
-    try {
-      this.subscriber.allowFlow(
-          FlowControlRequest.newBuilder()
-              .setAllowedBytes(flowControlSettings.bytesOutstanding())
-              .setAllowedMessages(flowControlSettings.messagesOutstanding())
-              .build());
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
-  private void handleMessages(List<SequencedMessage> messages) {
-    if (completionFuture.isDone()) {
-      return;
-    }
-    Offset lastOffset = Offset.of(Iterables.getLast(messages).getCursor().getOffset());
-    long byteSize = messages.stream().mapToLong(SequencedMessage::getSizeBytes).sum();
-    if (tracker.tryClaim(OffsetByteProgress.of(lastOffset, byteSize))) {
-      lastClaimedOffset = Optional.of(lastOffset);
-      messages.forEach(
-          message ->
-              receiver.outputWithTimestamp(
-                  message, new Instant(Timestamps.toMillis(message.getPublishTime()))));
+  public ProcessContinuation runFor(Duration duration) {
+    Instant maxReadTime = Instant.now().plus(duration);
+    while (subscriber.isRunning()) {
       try {
-        subscriber.allowFlow(
-            FlowControlRequest.newBuilder()
-                .setAllowedBytes(byteSize)
-                .setAllowedMessages(messages.size())
-                .build());
-      } catch (CheckedApiException e) {
-        completionFuture.setException(e);
+        Duration readTime = new Duration(Instant.now(), maxReadTime);
+        Future<Void> onData = subscriber.onData();
+        checkArgumentNotNull(onData);
+        onData.get(readTime.getMillis(), TimeUnit.MILLISECONDS);
+      } catch (TimeoutException e) {
+        // Read timed out without us being cut off, yield to the runtime.
+        return ProcessContinuation.resume();
+      } catch (InterruptedException | ExecutionException e2) {
+        // We should never be interrupted by beam, and onData should never return an error.
+        throw new RuntimeException(e2);
       }
-    } else {
-      completionFuture.set(null);
-    }
-  }
-
-  @Override
-  @SuppressWarnings("argument.type.incompatible")
-  public ProcessContinuation runFor(Duration duration) {
-    Instant deadline = Instant.now().plus(duration);
-    start();
-    try (SubscriptionPartitionProcessorImpl closeThis = this) {
-      while (!completionFuture.isDone() && deadline.isAfterNow()) {
-        @Nullable List<SequencedMessage> messages = transfer.poll(10, TimeUnit.MILLISECONDS);
-        if (messages != null) {
-          handleMessages(messages);
+      // Read any available data.

Review comment:
       would it be better to rearrange loop so that you try to peek before awaiting on onData?  Seems better to fall back to waiting on notification only if necessary because no data is present. 
   

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
##########
@@ -17,161 +17,109 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
-import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
-import com.google.api.core.ApiService.Listener;
-import com.google.api.core.ApiService.State;
 import com.google.cloud.pubsublite.Offset;
-import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
-import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.ExtractStatus;
-import com.google.cloud.pubsublite.internal.wire.Subscriber;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
-import com.google.cloud.pubsublite.proto.FlowControlRequest;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
-import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import javax.annotation.Nullable;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class SubscriptionPartitionProcessorImpl extends Listener
-    implements SubscriptionPartitionProcessor, AutoCloseable {
+class SubscriptionPartitionProcessorImpl implements SubscriptionPartitionProcessor {
   private static final Logger LOG =
       LoggerFactory.getLogger(SubscriptionPartitionProcessorImpl.class);
+  private final SubscriptionPartition subscriptionPartition;
   private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;
   private final OutputReceiver<SequencedMessage> receiver;
-  private final Subscriber subscriber;
-  private final SettableFuture<Void> completionFuture = SettableFuture.create();
-  // Queue to transfer messages from subscriber callback to runFor downcall.
-  private final SynchronousQueue<List<SequencedMessage>> transfer = new SynchronousQueue<>();
-  private final FlowControlSettings flowControlSettings;
+  private final MemoryBufferedSubscriber subscriber;
   private Optional<Offset> lastClaimedOffset = Optional.empty();
 
   @SuppressWarnings("methodref.receiver.bound.invalid")

Review comment:
       can this be removed?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
##########
@@ -17,161 +17,109 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
-import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
-import com.google.api.core.ApiService.Listener;
-import com.google.api.core.ApiService.State;
 import com.google.cloud.pubsublite.Offset;
-import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
-import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.ExtractStatus;
-import com.google.cloud.pubsublite.internal.wire.Subscriber;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
-import com.google.cloud.pubsublite.proto.FlowControlRequest;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
-import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import javax.annotation.Nullable;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class SubscriptionPartitionProcessorImpl extends Listener
-    implements SubscriptionPartitionProcessor, AutoCloseable {
+class SubscriptionPartitionProcessorImpl implements SubscriptionPartitionProcessor {
   private static final Logger LOG =
       LoggerFactory.getLogger(SubscriptionPartitionProcessorImpl.class);
+  private final SubscriptionPartition subscriptionPartition;
   private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;
   private final OutputReceiver<SequencedMessage> receiver;
-  private final Subscriber subscriber;
-  private final SettableFuture<Void> completionFuture = SettableFuture.create();
-  // Queue to transfer messages from subscriber callback to runFor downcall.
-  private final SynchronousQueue<List<SequencedMessage>> transfer = new SynchronousQueue<>();
-  private final FlowControlSettings flowControlSettings;
+  private final MemoryBufferedSubscriber subscriber;
   private Optional<Offset> lastClaimedOffset = Optional.empty();
 
   @SuppressWarnings("methodref.receiver.bound.invalid")
   SubscriptionPartitionProcessorImpl(
+      SubscriptionPartition subscriptionPartition,
       RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
       OutputReceiver<SequencedMessage> receiver,
-      Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory,
-      FlowControlSettings flowControlSettings) {
+      Supplier<MemoryBufferedSubscriber> subscriberFactory) {
+    this.subscriptionPartition = subscriptionPartition;
     this.tracker = tracker;
     this.receiver = receiver;
-    this.subscriber = subscriberFactory.apply(this::onSubscriberMessages);
-    this.flowControlSettings = flowControlSettings;
+    this.subscriber = getReadySubscriber(subscriberFactory);
   }
 
   @Override
-  public void failed(State from, Throwable failure) {
-    completionFuture.setException(ExtractStatus.toCanonical(failure));
-  }
-
-  private void onSubscriberMessages(List<SequencedMessage> messages) {
-    try {
-      while (!completionFuture.isDone()) {
-        if (transfer.offer(messages, 10, TimeUnit.MILLISECONDS)) {
-          return;
-        }
-      }
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
   @SuppressWarnings("argument.type.incompatible")
-  private void start() {
-    this.subscriber.addListener(this, SystemExecutors.getFuturesExecutor());
-    this.subscriber.startAsync();
-    this.subscriber.awaitRunning();
-    try {
-      this.subscriber.allowFlow(
-          FlowControlRequest.newBuilder()
-              .setAllowedBytes(flowControlSettings.bytesOutstanding())
-              .setAllowedMessages(flowControlSettings.messagesOutstanding())
-              .build());
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
-  private void handleMessages(List<SequencedMessage> messages) {
-    if (completionFuture.isDone()) {
-      return;
-    }
-    Offset lastOffset = Offset.of(Iterables.getLast(messages).getCursor().getOffset());
-    long byteSize = messages.stream().mapToLong(SequencedMessage::getSizeBytes).sum();
-    if (tracker.tryClaim(OffsetByteProgress.of(lastOffset, byteSize))) {
-      lastClaimedOffset = Optional.of(lastOffset);
-      messages.forEach(
-          message ->
-              receiver.outputWithTimestamp(
-                  message, new Instant(Timestamps.toMillis(message.getPublishTime()))));
+  public ProcessContinuation runFor(Duration duration) {
+    Instant maxReadTime = Instant.now().plus(duration);
+    while (subscriber.isRunning()) {
       try {
-        subscriber.allowFlow(
-            FlowControlRequest.newBuilder()
-                .setAllowedBytes(byteSize)
-                .setAllowedMessages(messages.size())
-                .build());
-      } catch (CheckedApiException e) {
-        completionFuture.setException(e);
+        Duration readTime = new Duration(Instant.now(), maxReadTime);
+        Future<Void> onData = subscriber.onData();
+        checkArgumentNotNull(onData);
+        onData.get(readTime.getMillis(), TimeUnit.MILLISECONDS);
+      } catch (TimeoutException e) {
+        // Read timed out without us being cut off, yield to the runtime.
+        return ProcessContinuation.resume();
+      } catch (InterruptedException | ExecutionException e2) {
+        // We should never be interrupted by beam, and onData should never return an error.
+        throw new RuntimeException(e2);
       }
-    } else {
-      completionFuture.set(null);
-    }
-  }
-
-  @Override
-  @SuppressWarnings("argument.type.incompatible")
-  public ProcessContinuation runFor(Duration duration) {
-    Instant deadline = Instant.now().plus(duration);
-    start();
-    try (SubscriptionPartitionProcessorImpl closeThis = this) {
-      while (!completionFuture.isDone() && deadline.isAfterNow()) {
-        @Nullable List<SequencedMessage> messages = transfer.poll(10, TimeUnit.MILLISECONDS);
-        if (messages != null) {
-          handleMessages(messages);
+      // Read any available data.
+      for (Optional<SequencedMessage> next = subscriber.peek();
+          next.isPresent();
+          next = subscriber.peek()) {
+        SequencedMessage message = next.get();
+        System.err.println("Next: " + message);
+        Offset messageOffset = Offset.of(message.getCursor().getOffset());
+        if (tracker.tryClaim(OffsetByteProgress.of(messageOffset, message.getSizeBytes()))) {
+          subscriber.pop();
+          lastClaimedOffset = Optional.of(messageOffset);
+          receiver.outputWithTimestamp(
+              message, new Instant(Timestamps.toMillis(message.getPublishTime())));
+        } else {
+          // Our claim failed, return stop()
+          return ProcessContinuation.stop();
         }
       }
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-    // Determine return code after shutdown.
-    if (completionFuture.isDone()) {
-      // Call get() to ensure there is no exception.
-      try {
-        completionFuture.get();
-      } catch (Throwable t) {
-        throw ExtractStatus.toCanonical(t).underlying;
-      }
-      // CompletionFuture set with null when tryClaim returned false.
-      return ProcessContinuation.stop();
     }
+    // We were interrupted,
     return ProcessContinuation.resume();
   }
 
-  @Override
-  public void close() {
-    try {
-      blockingShutdown(subscriber);
-    } catch (Throwable t) {
-      // Don't propagate errors on subscriber shutdown.
-      LOG.info("Error on subscriber shutdown.", t);
-    }
-  }
-
   @Override
   public Optional<Offset> lastClaimed() {
     return lastClaimedOffset;
   }
+
+  private MemoryBufferedSubscriber getReadySubscriber(
+      Supplier<MemoryBufferedSubscriber> getOrCreate) {
+    Offset startOffset = Offset.of(tracker.currentRestriction().getRange().getFrom());
+    MemoryBufferedSubscriber subscriber = getOrCreate.get();
+    Offset fetchOffset = subscriber.fetchOffset();
+    while (!startOffset.equals(fetchOffset)) {
+      LOG.info(
+          "Discarding subscriber due to mismatch, this should be rare. {}, start: {} fetch: {}",
+          subscriptionPartition,
+          startOffset,
+          fetchOffset);
+      try {
+        subscriber.stopAsync().awaitTerminated();
+      } catch (Exception ignored) {
+      }
+      subscriber = getOrCreate.get();
+      fetchOffset = subscriber.fetchOffset();

Review comment:
       nit: I think it would be less error prone to assign subscriber/fetchOffset in only one spot
   while (true) {
      MemoryBufferedSubscriber subscriber = getOrCreate.get();
      Offset fetchOffset = ...
      if (startOffset.equals(fetchOffset)) {
          subscriber.rebuffer(); 
          return subscriber;
      }
   
      LOG, recreate stop subscriber
   }
   

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java
##########
@@ -61,8 +64,8 @@ public void teardown() {
   }
 
   @GetInitialWatermarkEstimatorState
-  public Instant getInitialWatermarkState() {
-    return Instant.EPOCH;
+  public Instant getInitialWatermarkState(@Timestamp Instant inputTs) {

Review comment:
       can this be unit tested also? Or is that tricky because this depended on Watch watermark interaction?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriber.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite.internal;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiService;
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import java.util.Optional;
+
+interface MemoryBufferedSubscriber extends ApiService {
+
+  /**
+   * Get the current fetch offset of this subscriber. This offset will be less than or equal to all
+   * future messages returned by this object.
+   */
+  Offset fetchOffset();
+
+  /**
+   * Notify this subscriber that all previously in-memory messages are now no longer taking up space
+   * in this JVM.

Review comment:
       This doesn't seem to be true and seems to indicate how this should be used but there seem to be other valid uses.
   
   How about saying that previously popped messages are will not limit the memory budget

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
##########
@@ -17,161 +17,109 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
-import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
-import com.google.api.core.ApiService.Listener;
-import com.google.api.core.ApiService.State;
 import com.google.cloud.pubsublite.Offset;
-import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
-import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.ExtractStatus;
-import com.google.cloud.pubsublite.internal.wire.Subscriber;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
-import com.google.cloud.pubsublite.proto.FlowControlRequest;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
-import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import javax.annotation.Nullable;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class SubscriptionPartitionProcessorImpl extends Listener
-    implements SubscriptionPartitionProcessor, AutoCloseable {
+class SubscriptionPartitionProcessorImpl implements SubscriptionPartitionProcessor {
   private static final Logger LOG =
       LoggerFactory.getLogger(SubscriptionPartitionProcessorImpl.class);
+  private final SubscriptionPartition subscriptionPartition;
   private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;
   private final OutputReceiver<SequencedMessage> receiver;
-  private final Subscriber subscriber;
-  private final SettableFuture<Void> completionFuture = SettableFuture.create();
-  // Queue to transfer messages from subscriber callback to runFor downcall.
-  private final SynchronousQueue<List<SequencedMessage>> transfer = new SynchronousQueue<>();
-  private final FlowControlSettings flowControlSettings;
+  private final MemoryBufferedSubscriber subscriber;
   private Optional<Offset> lastClaimedOffset = Optional.empty();
 
   @SuppressWarnings("methodref.receiver.bound.invalid")
   SubscriptionPartitionProcessorImpl(
+      SubscriptionPartition subscriptionPartition,
       RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
       OutputReceiver<SequencedMessage> receiver,
-      Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory,
-      FlowControlSettings flowControlSettings) {
+      Supplier<MemoryBufferedSubscriber> subscriberFactory) {
+    this.subscriptionPartition = subscriptionPartition;
     this.tracker = tracker;
     this.receiver = receiver;
-    this.subscriber = subscriberFactory.apply(this::onSubscriberMessages);
-    this.flowControlSettings = flowControlSettings;
+    this.subscriber = getReadySubscriber(subscriberFactory);
   }
 
   @Override
-  public void failed(State from, Throwable failure) {
-    completionFuture.setException(ExtractStatus.toCanonical(failure));
-  }
-
-  private void onSubscriberMessages(List<SequencedMessage> messages) {
-    try {
-      while (!completionFuture.isDone()) {
-        if (transfer.offer(messages, 10, TimeUnit.MILLISECONDS)) {
-          return;
-        }
-      }
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
   @SuppressWarnings("argument.type.incompatible")
-  private void start() {
-    this.subscriber.addListener(this, SystemExecutors.getFuturesExecutor());
-    this.subscriber.startAsync();
-    this.subscriber.awaitRunning();
-    try {
-      this.subscriber.allowFlow(
-          FlowControlRequest.newBuilder()
-              .setAllowedBytes(flowControlSettings.bytesOutstanding())
-              .setAllowedMessages(flowControlSettings.messagesOutstanding())
-              .build());
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
-  private void handleMessages(List<SequencedMessage> messages) {
-    if (completionFuture.isDone()) {
-      return;
-    }
-    Offset lastOffset = Offset.of(Iterables.getLast(messages).getCursor().getOffset());
-    long byteSize = messages.stream().mapToLong(SequencedMessage::getSizeBytes).sum();
-    if (tracker.tryClaim(OffsetByteProgress.of(lastOffset, byteSize))) {
-      lastClaimedOffset = Optional.of(lastOffset);
-      messages.forEach(
-          message ->
-              receiver.outputWithTimestamp(
-                  message, new Instant(Timestamps.toMillis(message.getPublishTime()))));
+  public ProcessContinuation runFor(Duration duration) {
+    Instant maxReadTime = Instant.now().plus(duration);
+    while (subscriber.isRunning()) {
       try {
-        subscriber.allowFlow(
-            FlowControlRequest.newBuilder()
-                .setAllowedBytes(byteSize)
-                .setAllowedMessages(messages.size())
-                .build());
-      } catch (CheckedApiException e) {
-        completionFuture.setException(e);
+        Duration readTime = new Duration(Instant.now(), maxReadTime);
+        Future<Void> onData = subscriber.onData();
+        checkArgumentNotNull(onData);
+        onData.get(readTime.getMillis(), TimeUnit.MILLISECONDS);
+      } catch (TimeoutException e) {
+        // Read timed out without us being cut off, yield to the runtime.
+        return ProcessContinuation.resume();
+      } catch (InterruptedException | ExecutionException e2) {
+        // We should never be interrupted by beam, and onData should never return an error.
+        throw new RuntimeException(e2);
       }
-    } else {
-      completionFuture.set(null);
-    }
-  }
-
-  @Override
-  @SuppressWarnings("argument.type.incompatible")
-  public ProcessContinuation runFor(Duration duration) {
-    Instant deadline = Instant.now().plus(duration);
-    start();
-    try (SubscriptionPartitionProcessorImpl closeThis = this) {
-      while (!completionFuture.isDone() && deadline.isAfterNow()) {
-        @Nullable List<SequencedMessage> messages = transfer.poll(10, TimeUnit.MILLISECONDS);
-        if (messages != null) {
-          handleMessages(messages);
+      // Read any available data.
+      for (Optional<SequencedMessage> next = subscriber.peek();
+          next.isPresent();
+          next = subscriber.peek()) {
+        SequencedMessage message = next.get();
+        System.err.println("Next: " + message);
+        Offset messageOffset = Offset.of(message.getCursor().getOffset());
+        if (tracker.tryClaim(OffsetByteProgress.of(messageOffset, message.getSizeBytes()))) {
+          subscriber.pop();
+          lastClaimedOffset = Optional.of(messageOffset);
+          receiver.outputWithTimestamp(
+              message, new Instant(Timestamps.toMillis(message.getPublishTime())));
+        } else {
+          // Our claim failed, return stop()
+          return ProcessContinuation.stop();
         }
       }
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-    // Determine return code after shutdown.
-    if (completionFuture.isDone()) {
-      // Call get() to ensure there is no exception.
-      try {
-        completionFuture.get();
-      } catch (Throwable t) {
-        throw ExtractStatus.toCanonical(t).underlying;
-      }
-      // CompletionFuture set with null when tryClaim returned false.
-      return ProcessContinuation.stop();
     }
+    // We were interrupted,

Review comment:
       This made me think of interrupted exception, but it this seems to be due to subscriber no longer running. Reword for clarity




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org