You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/05 00:23:00 UTC

[GitHub] [beam] boyuanzz commented on a change in pull request #13470: [BEAM-10114] Convert PubsubLiteIO read to use SplittableDoFn.

boyuanzz commented on a change in pull request #13470:
URL: https://github.com/apache/beam/pull/13470#discussion_r536433894



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+
+public class OffsetByteRangeTracker extends RestrictionTracker<OffsetRange, OffsetByteProgress>
+    implements HasProgress {
+  private final TopicBacklogReader backlogReader;
+  private OffsetRange range;
+  private @Nullable Long lastClaimed;
+  private long byteCount = 0;
+
+  public OffsetByteRangeTracker(OffsetRange range, TopicBacklogReader backlogReader) {
+    checkArgument(range.getTo() == Long.MAX_VALUE);
+    this.backlogReader = backlogReader;
+    this.range = range;
+  }
+
+  @Override
+  public IsBounded isBounded() {
+    return IsBounded.UNBOUNDED;
+  }
+
+  @Override
+  public boolean tryClaim(OffsetByteProgress position) {
+    long toClaim = position.lastOffset().value();
+    checkArgument(
+        lastClaimed == null || toClaim > lastClaimed,
+        "Trying to claim offset %s while last attempted was %s",
+        position.lastOffset().value(),
+        lastClaimed);
+    checkArgument(
+        toClaim >= range.getFrom(),
+        "Trying to claim offset %s before start of the range %s",
+        toClaim,
+        range);
+    // split() has already been called, truncating this range. No more offsets may be claimed.
+    if (range.getTo() != Long.MAX_VALUE) {
+      boolean isRangeEmpty = range.getTo() == range.getFrom();
+      boolean isValidClosedRange = nextOffset() == range.getTo();
+      checkState(
+          isRangeEmpty || isValidClosedRange,
+          "Violated class precondition: offset range improperly split. Please report a beam bug.");
+      return false;
+    }
+    lastClaimed = toClaim;
+    byteCount += position.batchBytes();
+    return true;
+  }
+
+  @Override
+  public OffsetRange currentRestriction() {
+    return range;
+  }
+
+  private long nextOffset() {
+    return lastClaimed == null ? currentRestriction().getFrom() : lastClaimed + 1;

Review comment:
       If `lastClaimed == Long.MAX_VALUE`, you will get overflow here.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerPartitionSdf.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.internal.PullSubscriber;
+import com.google.cloud.pubsublite.internal.wire.Committer;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.flogger.GoogleLogger;
+import com.google.protobuf.util.Timestamps;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableBiFunction;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.util.Sleeper;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+class PerPartitionSdf extends DoFn<Partition, SequencedMessage> {

Review comment:
       I think you may also want to track watermark by implementing watermark related APIs: https://beam.apache.org/documentation/programming-guide/#watermark-estimation

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerPartitionSdf.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.internal.PullSubscriber;
+import com.google.cloud.pubsublite.internal.wire.Committer;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.flogger.GoogleLogger;
+import com.google.protobuf.util.Timestamps;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableBiFunction;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.util.Sleeper;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+class PerPartitionSdf extends DoFn<Partition, SequencedMessage> {
+  private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
+  private final Duration maxSleepTime;
+  private final SerializableBiFunction<Partition, Offset, PullSubscriber<SequencedMessage>>
+      subscriberFactory;
+  private final SerializableFunction<Partition, Committer> committerFactory;
+  private final SerializableSupplier<Sleeper> sleeperSupplier;
+  private final SerializableFunction<Partition, InitialOffsetReader> offsetReaderFactory;
+  private final SerializableBiFunction<
+          Partition, OffsetRange, RestrictionTracker<OffsetRange, OffsetByteProgress>>
+      trackerFactory;
+
+  Duration sleepTimeRemaining;
+
+  PerPartitionSdf(
+      Duration maxSleepTime,
+      SerializableBiFunction<Partition, Offset, PullSubscriber<SequencedMessage>> subscriberFactory,
+      SerializableFunction<Partition, Committer> committerFactory,
+      SerializableSupplier<Sleeper> sleeperSupplier,
+      SerializableFunction<Partition, InitialOffsetReader> offsetReaderFactory,
+      SerializableBiFunction<
+              Partition, OffsetRange, RestrictionTracker<OffsetRange, OffsetByteProgress>>
+          trackerFactory) {
+    this.maxSleepTime = maxSleepTime;
+    this.sleepTimeRemaining = maxSleepTime;
+    this.subscriberFactory = subscriberFactory;
+    this.committerFactory = committerFactory;
+    this.sleeperSupplier = sleeperSupplier;
+    this.offsetReaderFactory = offsetReaderFactory;
+    this.trackerFactory = trackerFactory;
+  }
+
+  private List<SequencedMessage> doPoll(PullSubscriber<SequencedMessage> subscriber)
+      throws Exception {
+    Sleeper sleeper = sleeperSupplier.get();
+    while (sleepTimeRemaining.isLongerThan(Duration.ZERO)) {
+      List<SequencedMessage> messages = subscriber.pull();
+      if (!messages.isEmpty()) {
+        return messages;
+      }
+      Duration sleepTime =
+          Collections.min(ImmutableList.of(sleepTimeRemaining, Duration.millis(50)));
+      sleepTimeRemaining = sleepTimeRemaining.minus(sleepTime);
+      sleeper.sleep(sleepTime.getMillis());
+    }
+    return ImmutableList.of();
+  }
+
+  @ProcessElement
+  public ProcessContinuation processElement(
+      RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
+      @Element Partition partition,
+      OutputReceiver<SequencedMessage> receiver)
+      throws Exception {
+    logger.atInfo().log("Starting processing for partition " + partition);
+    sleepTimeRemaining = maxSleepTime;
+    Committer committer = committerFactory.apply(partition);
+    committer.startAsync().awaitRunning();
+    try (PullSubscriber<SequencedMessage> subscriber =
+        subscriberFactory.apply(partition, Offset.of(tracker.currentRestriction().getFrom()))) {
+      while (true) {
+        List<SequencedMessage> messages = doPoll(subscriber);
+        // We polled for as long as possible, yield to the runtime to allow it to reschedule us on
+        // a new task.
+        if (messages.isEmpty()) {
+          logger.atInfo().log("Yielding due to timeout on partition " + partition);
+          return ProcessContinuation.resume();
+        }
+        Offset lastOffset = Offset.of(Iterables.getLast(messages).getCursor().getOffset());
+        long byteSize = messages.stream().mapToLong(SequencedMessage::getSizeBytes).sum();
+        if (tracker.tryClaim(OffsetByteProgress.of(lastOffset, byteSize))) {
+          messages.forEach(
+              message ->
+                  receiver.outputWithTimestamp(
+                      message, new Instant(Timestamps.toMillis(message.getPublishTime()))));
+          committer.commitOffset(Offset.of(lastOffset.value() + 1)).get();

Review comment:
       What'e the effect of committing offset? Are we able to read from that offset again if it's committed?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerPartitionSdf.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.internal.PullSubscriber;
+import com.google.cloud.pubsublite.internal.wire.Committer;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.flogger.GoogleLogger;
+import com.google.protobuf.util.Timestamps;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.SerializableBiFunction;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.util.Sleeper;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+class PerPartitionSdf extends DoFn<Partition, SequencedMessage> {
+  private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
+  private final Duration maxSleepTime;
+  private final SerializableBiFunction<Partition, Offset, PullSubscriber<SequencedMessage>>
+      subscriberFactory;
+  private final SerializableFunction<Partition, Committer> committerFactory;
+  private final SerializableSupplier<Sleeper> sleeperSupplier;
+  private final SerializableFunction<Partition, InitialOffsetReader> offsetReaderFactory;
+  private final SerializableBiFunction<
+          Partition, OffsetRange, RestrictionTracker<OffsetRange, OffsetByteProgress>>
+      trackerFactory;
+
+  Duration sleepTimeRemaining;
+
+  PerPartitionSdf(
+      Duration maxSleepTime,
+      SerializableBiFunction<Partition, Offset, PullSubscriber<SequencedMessage>> subscriberFactory,
+      SerializableFunction<Partition, Committer> committerFactory,
+      SerializableSupplier<Sleeper> sleeperSupplier,
+      SerializableFunction<Partition, InitialOffsetReader> offsetReaderFactory,
+      SerializableBiFunction<
+              Partition, OffsetRange, RestrictionTracker<OffsetRange, OffsetByteProgress>>
+          trackerFactory) {
+    this.maxSleepTime = maxSleepTime;
+    this.sleepTimeRemaining = maxSleepTime;
+    this.subscriberFactory = subscriberFactory;
+    this.committerFactory = committerFactory;
+    this.sleeperSupplier = sleeperSupplier;
+    this.offsetReaderFactory = offsetReaderFactory;
+    this.trackerFactory = trackerFactory;
+  }
+
+  private List<SequencedMessage> doPoll(PullSubscriber<SequencedMessage> subscriber)
+      throws Exception {
+    Sleeper sleeper = sleeperSupplier.get();
+    while (sleepTimeRemaining.isLongerThan(Duration.ZERO)) {
+      List<SequencedMessage> messages = subscriber.pull();
+      if (!messages.isEmpty()) {
+        return messages;
+      }
+      Duration sleepTime =
+          Collections.min(ImmutableList.of(sleepTimeRemaining, Duration.millis(50)));
+      sleepTimeRemaining = sleepTimeRemaining.minus(sleepTime);
+      sleeper.sleep(sleepTime.getMillis());
+    }
+    return ImmutableList.of();
+  }
+
+  @ProcessElement
+  public ProcessContinuation processElement(
+      RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
+      @Element Partition partition,

Review comment:
       I'm wondering how a partition can locate a read, I would image we at least need a topic. Is it plumped through by `subscriberFactory ` during construction time?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+
+public class OffsetByteRangeTracker extends RestrictionTracker<OffsetRange, OffsetByteProgress>
+    implements HasProgress {
+  private final TopicBacklogReader backlogReader;
+  private OffsetRange range;
+  private @Nullable Long lastClaimed;
+  private long byteCount = 0;
+
+  public OffsetByteRangeTracker(OffsetRange range, TopicBacklogReader backlogReader) {
+    checkArgument(range.getTo() == Long.MAX_VALUE);
+    this.backlogReader = backlogReader;
+    this.range = range;
+  }
+
+  @Override
+  public IsBounded isBounded() {
+    return IsBounded.UNBOUNDED;
+  }
+
+  @Override
+  public boolean tryClaim(OffsetByteProgress position) {
+    long toClaim = position.lastOffset().value();
+    checkArgument(
+        lastClaimed == null || toClaim > lastClaimed,
+        "Trying to claim offset %s while last attempted was %s",
+        position.lastOffset().value(),
+        lastClaimed);
+    checkArgument(
+        toClaim >= range.getFrom(),
+        "Trying to claim offset %s before start of the range %s",
+        toClaim,
+        range);
+    // split() has already been called, truncating this range. No more offsets may be claimed.
+    if (range.getTo() != Long.MAX_VALUE) {
+      boolean isRangeEmpty = range.getTo() == range.getFrom();
+      boolean isValidClosedRange = nextOffset() == range.getTo();
+      checkState(
+          isRangeEmpty || isValidClosedRange,
+          "Violated class precondition: offset range improperly split. Please report a beam bug.");
+      return false;
+    }
+    lastClaimed = toClaim;
+    byteCount += position.batchBytes();
+    return true;
+  }
+
+  @Override
+  public OffsetRange currentRestriction() {
+    return range;
+  }
+
+  private long nextOffset() {
+    return lastClaimed == null ? currentRestriction().getFrom() : lastClaimed + 1;
+  }
+
+  @Override
+  public @Nullable SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
+    // Cannot split a bounded range. This should already be completely claimed.
+    if (range.getTo() != Long.MAX_VALUE) return null;
+    range = new OffsetRange(currentRestriction().getFrom(), nextOffset());
+    return SplitResult.of(this.range, new OffsetRange(nextOffset(), Long.MAX_VALUE));
+  }
+
+  @Override
+  @SuppressWarnings("unboxing.of.nullable")
+  public void checkDone() throws IllegalStateException {
+    backlogReader.close();

Review comment:
       If something goes wrong before we reaching to `checkDone`, we will have resource leak on `backlogReader`.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java
##########
@@ -64,8 +64,8 @@ private PubsubLiteIO() {}
    *     .build()), "read");
    * }</pre>
    */
-  public static Read.Unbounded<SequencedMessage> read(SubscriberOptions options) {
-    return Read.from(new PubsubLiteUnboundedSource(options));
+  public static PTransform<PBegin, PCollection<SequencedMessage>> read(SubscriberOptions options) {

Review comment:
       It seems like the `PubsubLiteIO.read()` is for reading one topic(subscription). Do we have a plan to have `PubsubLiteIO` expose `readAll()` API to read from multiple topics/subscriptions?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+
+public class OffsetByteRangeTracker extends RestrictionTracker<OffsetRange, OffsetByteProgress>

Review comment:
       Please add some javadoc to this tracker, especially about the assumption around `range.getTo() == Long.MAX_VALUE` and you ignore the `fractionOfRemainder` in `trySplit`

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+
+public class OffsetByteRangeTracker extends RestrictionTracker<OffsetRange, OffsetByteProgress>
+    implements HasProgress {
+  private final TopicBacklogReader backlogReader;
+  private OffsetRange range;
+  private @Nullable Long lastClaimed;
+  private long byteCount = 0;
+
+  public OffsetByteRangeTracker(OffsetRange range, TopicBacklogReader backlogReader) {
+    checkArgument(range.getTo() == Long.MAX_VALUE);
+    this.backlogReader = backlogReader;
+    this.range = range;
+  }
+
+  @Override
+  public IsBounded isBounded() {
+    return IsBounded.UNBOUNDED;
+  }
+
+  @Override
+  public boolean tryClaim(OffsetByteProgress position) {
+    long toClaim = position.lastOffset().value();
+    checkArgument(
+        lastClaimed == null || toClaim > lastClaimed,
+        "Trying to claim offset %s while last attempted was %s",
+        position.lastOffset().value(),
+        lastClaimed);
+    checkArgument(
+        toClaim >= range.getFrom(),
+        "Trying to claim offset %s before start of the range %s",
+        toClaim,
+        range);
+    // split() has already been called, truncating this range. No more offsets may be claimed.
+    if (range.getTo() != Long.MAX_VALUE) {
+      boolean isRangeEmpty = range.getTo() == range.getFrom();
+      boolean isValidClosedRange = nextOffset() == range.getTo();
+      checkState(
+          isRangeEmpty || isValidClosedRange,
+          "Violated class precondition: offset range improperly split. Please report a beam bug.");
+      return false;
+    }
+    lastClaimed = toClaim;
+    byteCount += position.batchBytes();
+    return true;
+  }
+
+  @Override
+  public OffsetRange currentRestriction() {
+    return range;
+  }
+
+  private long nextOffset() {
+    return lastClaimed == null ? currentRestriction().getFrom() : lastClaimed + 1;
+  }
+
+  @Override
+  public @Nullable SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {

Review comment:
       Please file a JIRA and add a TODO here which talks about the improvement you are going to make.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite;
+
+import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.internal.BufferingPullSubscriber;
+import com.google.cloud.pubsublite.internal.PullSubscriber;
+import com.google.cloud.pubsublite.proto.Cursor;
+import com.google.cloud.pubsublite.proto.SeekRequest;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+
+class SubscribeTransform extends PTransform<PBegin, PCollection<SequencedMessage>> {
+  private static final Duration MAX_SLEEP_TIME = Duration.standardMinutes(1);
+
+  private final SubscriberOptions options;
+
+  SubscribeTransform(SubscriberOptions options) {
+    this.options = options;
+  }
+
+  private PullSubscriber<SequencedMessage> newPullSubscriber(Partition partition, Offset offset)
+      throws ApiException {
+    try {
+      return new TranslatingPullSubscriber(
+          new BufferingPullSubscriber(
+              options.getSubscriberFactory(partition),
+              options.flowControlSettings(),
+              SeekRequest.newBuilder()
+                  .setCursor(Cursor.newBuilder().setOffset(offset.value()))
+                  .build()));
+    } catch (Throwable t) {
+      throw toCanonical(t).underlying;
+    }
+  }
+
+  private RestrictionTracker<OffsetRange, OffsetByteProgress> newRestrictionTracker(
+      Partition partition, OffsetRange initial) {
+    return new OffsetByteRangeTracker(initial, options.getBacklogReader(partition));
+  }
+
+  @Override
+  public PCollection<SequencedMessage> expand(PBegin input) {
+    PCollection<Partition> partitions = Create.of(options.partitions()).expand(input);
+    // Prevent fusion between Create and read.
+    PCollection<Partition> shuffledPartitions = partitions.apply(Reshuffle.viaRandomKey());
+    return shuffledPartitions.apply(
+        ParDo.of(
+            new PerPartitionSdf(

Review comment:
       I'm thinking about whether it makes sense to have `PerPartitionSdf` to read from a `SubscriberOptions` or something that can locate a read(topic + partition + something else). It will also help us to enable `readAll()` API I mentioned above. Also the `PerPartitionSdf` will also be able to read from subscriptions/partitions that are created during pipeline execution time. 
   One major feature request for Kafka IO is to read from new added topics/partitions dynamically, which I think PubsubLiteIO might have the similar customer needs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org