You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/18 21:00:06 UTC

[GitHub] [beam] BenWhitehead commented on a change in pull request #15005: [BEAM-8376] Google Cloud Firestore Connector - Add Firestore v1 Read Operations

BenWhitehead commented on a change in pull request #15005:
URL: https://github.com/apache/beam/pull/15005#discussion_r654656702



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -700,6 +723,142 @@ public long nextBackOffMillis() {
     }
   }
 
+  /**
+   * This class implements a backoff algorithm similar to that of {@link
+   * org.apache.beam.sdk.util.FluentBackoff} with a could key differences:
+   *
+   * <ol>
+   *   <li>A set of status code numbers may be specified to have a graceful evaluation
+   *   <li>Gracefully evaluated status code numbers will increment a decaying counter to ensure if
+   *       the graceful status code numbers occur more than once in the previous 60 seconds the
+   *       regular backoff behavior will kick in.
+   *   <li>The random number generator used to induce jitter is provided via constructor parameter
+   *       rather than using {@link Math#random()}}
+   * </ol>
+   *
+   * The primary motivation for creating this implementation is to support streamed responses from
+   * Firestore. In the case of RunQuery and BatchGet the results are returned via stream. The result
+   * stream has a maximum lifetime of 60 seconds before it will be broken and an UNAVAILABLE status
+   * code will be raised. Give this UNAVAILABLE is expected for streams this class allows for
+   * defining a set of status code numbers which are give a grace count of 1 before backoff kicks
+   * in. When backoff does kick in, it is implemented using the same calculations as {@link
+   * org.apache.beam.sdk.util.FluentBackoff}.
+   */
+  static final class StatusCodeAwareBackoff {
+    private static final double RANDOMIZATION_FACTOR = 0.5;

Review comment:
       It does, here is the declaration site https://github.com/apache/beam/blob/0c01636fc8610414859d946cb93eabc904123fc8/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java#L36 and the use site https://github.com/apache/beam/blob/0c01636fc8610414859d946cb93eabc904123fc8/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FluentBackoff.java#L198-L199

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java
##########
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.gax.paging.AbstractPage;
+import com.google.api.gax.paging.AbstractPagedListResponse;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPage;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPage;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.NonWindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each of the supported read
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1ReadFn {
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link RunQueryRequest}s.
+   *
+   * <p>This Fn uses a stream to obtain responses, each response from the stream will be output to
+   * the next stage of the pipeline. Each response from the stream represents an individual document
+   * with the associated metadata.
+   *
+   * <p>If an error is encountered while reading from the stream, the stream will attempt to resume
+   * rather than starting over. The restarting of the stream will continue within the scope of the
+   * completion of the request (meaning any possibility of resumption is contingent upon an attempt
+   * being available in the Qos budget).
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class RunQueryFn
+      extends StreamingFirestoreV1ReadFn<RunQueryRequest, RunQueryResponse> {
+
+    RunQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.RunQuery;
+    }
+
+    @Override
+    protected ServerStreamingCallable<RunQueryRequest, RunQueryResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.runQueryCallable();
+    }
+
+    @Override
+    protected RunQueryRequest setStartFrom(
+        RunQueryRequest element, RunQueryResponse runQueryResponse) {
+      StructuredQuery query = element.getStructuredQuery();
+      StructuredQuery.Builder builder;
+      List<Order> orderByList = query.getOrderByList();
+      // if the orderByList is empty that means the default sort of "__name__ ASC" will be used
+      // Before we can set the cursor to the last document name read, we need to explicitly add
+      // the order of "__name__ ASC" because a cursor value must map to an order by
+      if (orderByList.isEmpty()) {
+        builder =
+            query
+                .toBuilder()
+                .addOrderBy(
+                    Order.newBuilder()
+                        .setField(FieldReference.newBuilder().setFieldPath("__name__").build())
+                        .setDirection(Direction.ASCENDING)
+                        .build())
+                .setStartAt(
+                    Cursor.newBuilder()
+                        .setBefore(false)
+                        .addValues(
+                            Value.newBuilder()
+                                .setReferenceValue(runQueryResponse.getDocument().getName())
+                                .build()));
+      } else {
+        Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+        Map<String, Value> fieldsMap = runQueryResponse.getDocument().getFieldsMap();
+        for (Order order : orderByList) {
+          String fieldPath = order.getField().getFieldPath();
+          Value value = fieldsMap.get(fieldPath);
+          if (value != null) {
+            cursor.addValues(value);
+          } else if ("__name__".equals(fieldPath)) {
+            cursor.addValues(
+                Value.newBuilder()
+                    .setReferenceValue(runQueryResponse.getDocument().getName())
+                    .build());
+          }
+        }
+        builder = query.toBuilder().setStartAt(cursor.build());
+      }
+      return element.toBuilder().setStructuredQuery(builder.build()).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link PartitionQueryRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, all pages will be aggregated before being
+   * emitted to the next stage of the pipeline. Aggregation of pages is necessary as the next step
+   * of pairing of cursors to create N queries must first sort all cursors. See <a target="_blank"
+   * rel="noopener noreferrer"
+   * href="https://cloud.google.com/firestore/docs/reference/rest/v1/projects.databases.documents/partitionQuery#request-body">{@code
+   * pageToken}s</a> documentation for details.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class PartitionQueryFn
+      extends BaseFirestoreV1ReadFn<PartitionQueryRequest, PartitionQueryPair> {
+
+    public PartitionQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.PartitionQuery;
+    }
+
+    @Override
+    public void processElement(ProcessContext context) throws Exception {
+      @SuppressWarnings("nullness")
+      final PartitionQueryRequest element =
+          requireNonNull(context.element(), "c.element() must be non null");
+
+      RpcQos.RpcReadAttempt attempt = rpcQos.newReadAttempt(getRpcAttemptContext());
+      PartitionQueryResponse.Builder aggregate = null;
+      while (true) {
+        if (!attempt.awaitSafeToProceed(clock.instant())) {
+          continue;
+        }
+
+        try {
+          PartitionQueryRequest request = setPageToken(element, aggregate);
+          attempt.recordRequestStart(clock.instant());
+          PartitionQueryPagedResponse pagedResponse =
+              firestoreStub.partitionQueryPagedCallable().call(request);
+          for (PartitionQueryPage page : pagedResponse.iteratePages()) {
+            attempt.recordRequestSuccessful(clock.instant());
+            PartitionQueryResponse response = page.getResponse();
+            if (aggregate == null) {
+              aggregate = response.toBuilder();
+            } else {
+              aggregate.addAllPartitions(response.getPartitionsList());
+              if (page.hasNextPage()) {
+                aggregate.setNextPageToken(response.getNextPageToken());
+              } else {
+                aggregate.clearNextPageToken();
+              }
+            }
+            if (page.hasNextPage()) {
+              attempt.recordRequestStart(clock.instant());
+            }
+          }
+          attempt.completeSuccess();
+          break;
+        } catch (RuntimeException exception) {
+          Instant end = clock.instant();
+          attempt.recordRequestFailed(end);
+          attempt.checkCanRetry(end, exception);
+        }
+      }
+      if (aggregate != null) {
+        context.output(new PartitionQueryPair(element, aggregate.build()));
+      }
+    }
+
+    private PartitionQueryRequest setPageToken(
+        PartitionQueryRequest request,
+        @edu.umd.cs.findbugs.annotations.Nullable PartitionQueryResponse.Builder aggregate) {
+      if (aggregate != null && aggregate.getNextPageToken() != null) {
+        return request.toBuilder().setPageToken(aggregate.getNextPageToken()).build();
+      }
+      return request;
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link ListDocumentsRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, the response from each page will be output to
+   * the next stage of the pipeline.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class ListDocumentsFn
+      extends PaginatedFirestoreV1ReadFn<
+          ListDocumentsRequest,
+          ListDocumentsPagedResponse,
+          ListDocumentsPage,
+          ListDocumentsResponse> {
+
+    ListDocumentsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.ListDocuments;
+    }
+
+    @Override
+    protected UnaryCallable<ListDocumentsRequest, ListDocumentsPagedResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.listDocumentsPagedCallable();
+    }
+
+    @Override
+    protected ListDocumentsRequest setPageToken(
+        ListDocumentsRequest request, String nextPageToken) {
+      return request.toBuilder().setPageToken(nextPageToken).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link ListCollectionIdsRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, the response from each page will be output to
+   * the next stage of the pipeline.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class ListCollectionIdsFn
+      extends PaginatedFirestoreV1ReadFn<
+          ListCollectionIdsRequest,
+          ListCollectionIdsPagedResponse,
+          ListCollectionIdsPage,
+          ListCollectionIdsResponse> {
+
+    ListCollectionIdsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.ListCollectionIds;
+    }
+
+    @Override
+    protected UnaryCallable<ListCollectionIdsRequest, ListCollectionIdsPagedResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.listCollectionIdsPagedCallable();
+    }
+
+    @Override
+    protected ListCollectionIdsRequest setPageToken(
+        ListCollectionIdsRequest request, String nextPageToken) {
+      return request.toBuilder().setPageToken(nextPageToken).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link BatchGetDocumentsRequest}s.
+   *
+   * <p>This Fn uses a stream to obtain responses, each response from the stream will be output to
+   * the next stage of the pipeline. Each response from the stream represents an individual document
+   * with the associated metadata.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class BatchGetDocumentsFn
+      extends StreamingFirestoreV1ReadFn<BatchGetDocumentsRequest, BatchGetDocumentsResponse> {
+
+    BatchGetDocumentsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.BatchGetDocuments;
+    }
+
+    @Override
+    protected ServerStreamingCallable<BatchGetDocumentsRequest, BatchGetDocumentsResponse>
+        getCallable(FirestoreStub firestoreStub) {
+      return firestoreStub.batchGetDocumentsCallable();
+    }
+
+    @Override
+    protected BatchGetDocumentsRequest setStartFrom(
+        BatchGetDocumentsRequest originalRequest, BatchGetDocumentsResponse mostRecentResponse) {
+      int startIndex = -1;
+      ProtocolStringList documentsList = originalRequest.getDocumentsList();
+      String missing = mostRecentResponse.getMissing();
+      String foundName =
+          mostRecentResponse.hasFound() ? mostRecentResponse.getFound().getName() : null;
+      // we only scan until the second to last originalRequest. If the final element were to be
+      // reached
+      // the full request would be complete and we wouldn't be in this scenario
+      int maxIndex = documentsList.size() - 2;
+      for (int i = 0; i <= maxIndex; i++) {

Review comment:
       We potentially could, but that'd require a refactor to the `org.apache.beam.sdk.io.gcp.firestore.FirestoreV1ReadFn.StreamingFirestoreV1ReadFn` to track number of elements received (which is shared with `RunQuery`). I'm not sure it'd provide much material value though.
   
   Even for a list of 10m document names a linear scan for the 2nd to last value can happen in ~120ms which compared to an RPC is negligible.
   
   <details>
     <summary>Scan perf test</summary>
     
   ##### Test
   ```java
     @Test
     public void scanMicroBench() {
       List<String> strings = IntStream.rangeClosed(1, 10_000_000)
           .mapToObj(i -> String.format("projects/p1/databases/d1/documents/c1/doc2/c2/doc%d", i))
           .collect(Collectors.toList());
   
       List<String> searchValues = List.of(
           "projects/p1/databases/d1/documents/c1/doc2/c2/doc100",
           "projects/p1/databases/d1/documents/c1/doc2/c2/doc10000",
           "projects/p1/databases/d1/documents/c1/doc2/c2/doc1000000",
           "projects/p1/databases/d1/documents/c1/doc2/c2/doc9999999"
       );
   
       for (String searchValue : searchValues) {
         var sw = Stopwatch.createStarted();
         for (int i = 0; i < strings.size(); i++) {
           String s = strings.get(i);
           if (searchValue.equals(s)) {
             break;
           }
         }
         var stop = sw.stop();
         System.out.printf("search[%s] %s%n", searchValue, stop.toString());
       }
     }
   ```
   
   ##### Output
   ```
   search[projects/p1/databases/d1/documents/c1/doc2/c2/doc100] 18.72 μs
   search[projects/p1/databases/d1/documents/c1/doc2/c2/doc10000] 683.9 μs
   search[projects/p1/databases/d1/documents/c1/doc2/c2/doc1000000] 19.83 ms
   search[projects/p1/databases/d1/documents/c1/doc2/c2/doc9999999] 127.4 ms
   ```
   
   </details>

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java
##########
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.gax.paging.AbstractPage;
+import com.google.api.gax.paging.AbstractPagedListResponse;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPage;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPage;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.NonWindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each of the supported read
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1ReadFn {
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link RunQueryRequest}s.
+   *
+   * <p>This Fn uses a stream to obtain responses, each response from the stream will be output to
+   * the next stage of the pipeline. Each response from the stream represents an individual document
+   * with the associated metadata.
+   *
+   * <p>If an error is encountered while reading from the stream, the stream will attempt to resume
+   * rather than starting over. The restarting of the stream will continue within the scope of the
+   * completion of the request (meaning any possibility of resumption is contingent upon an attempt
+   * being available in the Qos budget).
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class RunQueryFn
+      extends StreamingFirestoreV1ReadFn<RunQueryRequest, RunQueryResponse> {
+
+    RunQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.RunQuery;
+    }
+
+    @Override
+    protected ServerStreamingCallable<RunQueryRequest, RunQueryResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.runQueryCallable();
+    }
+
+    @Override
+    protected RunQueryRequest setStartFrom(
+        RunQueryRequest element, RunQueryResponse runQueryResponse) {
+      StructuredQuery query = element.getStructuredQuery();
+      StructuredQuery.Builder builder;
+      List<Order> orderByList = query.getOrderByList();
+      // if the orderByList is empty that means the default sort of "__name__ ASC" will be used
+      // Before we can set the cursor to the last document name read, we need to explicitly add
+      // the order of "__name__ ASC" because a cursor value must map to an order by
+      if (orderByList.isEmpty()) {
+        builder =
+            query
+                .toBuilder()
+                .addOrderBy(
+                    Order.newBuilder()
+                        .setField(FieldReference.newBuilder().setFieldPath("__name__").build())
+                        .setDirection(Direction.ASCENDING)
+                        .build())
+                .setStartAt(
+                    Cursor.newBuilder()
+                        .setBefore(false)
+                        .addValues(
+                            Value.newBuilder()
+                                .setReferenceValue(runQueryResponse.getDocument().getName())
+                                .build()));
+      } else {
+        Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+        Map<String, Value> fieldsMap = runQueryResponse.getDocument().getFieldsMap();
+        for (Order order : orderByList) {
+          String fieldPath = order.getField().getFieldPath();
+          Value value = fieldsMap.get(fieldPath);
+          if (value != null) {
+            cursor.addValues(value);
+          } else if ("__name__".equals(fieldPath)) {
+            cursor.addValues(
+                Value.newBuilder()
+                    .setReferenceValue(runQueryResponse.getDocument().getName())
+                    .build());
+          }
+        }
+        builder = query.toBuilder().setStartAt(cursor.build());
+      }
+      return element.toBuilder().setStructuredQuery(builder.build()).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link PartitionQueryRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, all pages will be aggregated before being
+   * emitted to the next stage of the pipeline. Aggregation of pages is necessary as the next step
+   * of pairing of cursors to create N queries must first sort all cursors. See <a target="_blank"
+   * rel="noopener noreferrer"
+   * href="https://cloud.google.com/firestore/docs/reference/rest/v1/projects.databases.documents/partitionQuery#request-body">{@code
+   * pageToken}s</a> documentation for details.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class PartitionQueryFn
+      extends BaseFirestoreV1ReadFn<PartitionQueryRequest, PartitionQueryPair> {
+
+    public PartitionQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.PartitionQuery;
+    }
+
+    @Override
+    public void processElement(ProcessContext context) throws Exception {
+      @SuppressWarnings("nullness")
+      final PartitionQueryRequest element =
+          requireNonNull(context.element(), "c.element() must be non null");
+
+      RpcQos.RpcReadAttempt attempt = rpcQos.newReadAttempt(getRpcAttemptContext());
+      PartitionQueryResponse.Builder aggregate = null;
+      while (true) {
+        if (!attempt.awaitSafeToProceed(clock.instant())) {
+          continue;
+        }
+
+        try {
+          PartitionQueryRequest request = setPageToken(element, aggregate);
+          attempt.recordRequestStart(clock.instant());
+          PartitionQueryPagedResponse pagedResponse =
+              firestoreStub.partitionQueryPagedCallable().call(request);
+          for (PartitionQueryPage page : pagedResponse.iteratePages()) {
+            attempt.recordRequestSuccessful(clock.instant());
+            PartitionQueryResponse response = page.getResponse();
+            if (aggregate == null) {
+              aggregate = response.toBuilder();
+            } else {
+              aggregate.addAllPartitions(response.getPartitionsList());
+              if (page.hasNextPage()) {
+                aggregate.setNextPageToken(response.getNextPageToken());
+              } else {
+                aggregate.clearNextPageToken();
+              }
+            }
+            if (page.hasNextPage()) {
+              attempt.recordRequestStart(clock.instant());
+            }
+          }
+          attempt.completeSuccess();
+          break;
+        } catch (RuntimeException exception) {
+          Instant end = clock.instant();
+          attempt.recordRequestFailed(end);
+          attempt.checkCanRetry(end, exception);
+        }
+      }
+      if (aggregate != null) {
+        context.output(new PartitionQueryPair(element, aggregate.build()));
+      }
+    }
+
+    private PartitionQueryRequest setPageToken(
+        PartitionQueryRequest request,
+        @edu.umd.cs.findbugs.annotations.Nullable PartitionQueryResponse.Builder aggregate) {
+      if (aggregate != null && aggregate.getNextPageToken() != null) {
+        return request.toBuilder().setPageToken(aggregate.getNextPageToken()).build();
+      }
+      return request;
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link ListDocumentsRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, the response from each page will be output to
+   * the next stage of the pipeline.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class ListDocumentsFn
+      extends PaginatedFirestoreV1ReadFn<
+          ListDocumentsRequest,
+          ListDocumentsPagedResponse,
+          ListDocumentsPage,
+          ListDocumentsResponse> {
+
+    ListDocumentsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.ListDocuments;
+    }
+
+    @Override
+    protected UnaryCallable<ListDocumentsRequest, ListDocumentsPagedResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.listDocumentsPagedCallable();
+    }
+
+    @Override
+    protected ListDocumentsRequest setPageToken(
+        ListDocumentsRequest request, String nextPageToken) {
+      return request.toBuilder().setPageToken(nextPageToken).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link ListCollectionIdsRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, the response from each page will be output to
+   * the next stage of the pipeline.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class ListCollectionIdsFn
+      extends PaginatedFirestoreV1ReadFn<
+          ListCollectionIdsRequest,
+          ListCollectionIdsPagedResponse,
+          ListCollectionIdsPage,
+          ListCollectionIdsResponse> {
+
+    ListCollectionIdsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.ListCollectionIds;
+    }
+
+    @Override
+    protected UnaryCallable<ListCollectionIdsRequest, ListCollectionIdsPagedResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.listCollectionIdsPagedCallable();
+    }
+
+    @Override
+    protected ListCollectionIdsRequest setPageToken(
+        ListCollectionIdsRequest request, String nextPageToken) {
+      return request.toBuilder().setPageToken(nextPageToken).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link BatchGetDocumentsRequest}s.
+   *
+   * <p>This Fn uses a stream to obtain responses, each response from the stream will be output to
+   * the next stage of the pipeline. Each response from the stream represents an individual document
+   * with the associated metadata.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class BatchGetDocumentsFn
+      extends StreamingFirestoreV1ReadFn<BatchGetDocumentsRequest, BatchGetDocumentsResponse> {
+
+    BatchGetDocumentsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.BatchGetDocuments;
+    }
+
+    @Override
+    protected ServerStreamingCallable<BatchGetDocumentsRequest, BatchGetDocumentsResponse>
+        getCallable(FirestoreStub firestoreStub) {
+      return firestoreStub.batchGetDocumentsCallable();
+    }
+
+    @Override
+    protected BatchGetDocumentsRequest setStartFrom(
+        BatchGetDocumentsRequest originalRequest, BatchGetDocumentsResponse mostRecentResponse) {
+      int startIndex = -1;
+      ProtocolStringList documentsList = originalRequest.getDocumentsList();
+      String missing = mostRecentResponse.getMissing();
+      String foundName =
+          mostRecentResponse.hasFound() ? mostRecentResponse.getFound().getName() : null;
+      // we only scan until the second to last originalRequest. If the final element were to be
+      // reached
+      // the full request would be complete and we wouldn't be in this scenario
+      int maxIndex = documentsList.size() - 2;
+      for (int i = 0; i <= maxIndex; i++) {
+        String docName = documentsList.get(i);
+        if (docName.equals(missing) || docName.equals(foundName)) {
+          startIndex = i;
+          break;
+        }
+      }
+      if (0 <= startIndex) {
+        BatchGetDocumentsRequest.Builder builder = originalRequest.toBuilder().clearDocuments();
+        documentsList.stream()
+            .skip(startIndex + 1) // start from the next entry from the one we found
+            .forEach(builder::addDocuments);
+        return builder.build();
+      }
+      // unable to find a match, return the original request
+      return originalRequest;

Review comment:
       I can update it to throw an error. What information would like to see included in the error message?
   
   Something like the following?
   ```suggestion
         throw new IllegalStateException(
             String.format(
                 "Unable to determine BatchGet resumption point. Most recently received doc __name__ '%s'",
                 foundName != null ? foundName : missing));
       }
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org