You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/16 00:38:33 UTC

[GitHub] [beam] danthev commented on a change in pull request #15005: [BEAM-8376] Google Cloud Firestore Connector - Add Firestore v1 Read Operations

danthev commented on a change in pull request #15005:
URL: https://github.com/apache/beam/pull/15005#discussion_r652221313



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java
##########
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.gax.paging.AbstractPage;
+import com.google.api.gax.paging.AbstractPagedListResponse;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPage;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPage;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.NonWindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each of the supported read
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1ReadFn {
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link RunQueryRequest}s.
+   *
+   * <p>This Fn uses a stream to obtain responses, each response from the stream will be output to
+   * the next stage of the pipeline. Each response from the stream represents an individual document
+   * with the associated metadata.
+   *
+   * <p>If an error is encountered while reading from the stream, the stream will attempt to resume
+   * rather than starting over. The restarting of the stream will continue within the scope of the
+   * completion of the request (meaning any possibility of resumption is contingent upon an attempt
+   * being available in the Qos budget).
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class RunQueryFn
+      extends StreamingFirestoreV1ReadFn<RunQueryRequest, RunQueryResponse> {
+
+    RunQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.RunQuery;
+    }
+
+    @Override
+    protected ServerStreamingCallable<RunQueryRequest, RunQueryResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.runQueryCallable();
+    }
+
+    @Override
+    protected RunQueryRequest setStartFrom(
+        RunQueryRequest element, RunQueryResponse runQueryResponse) {
+      StructuredQuery query = element.getStructuredQuery();
+      StructuredQuery.Builder builder;
+      List<Order> orderByList = query.getOrderByList();
+      // if the orderByList is empty that means the default sort of "__name__ ASC" will be used
+      // Before we can set the cursor to the last document name read, we need to explicitly add
+      // the order of "__name__ ASC" because a cursor value must map to an order by
+      if (orderByList.isEmpty()) {
+        builder =
+            query
+                .toBuilder()
+                .addOrderBy(
+                    Order.newBuilder()
+                        .setField(FieldReference.newBuilder().setFieldPath("__name__").build())
+                        .setDirection(Direction.ASCENDING)
+                        .build())
+                .setStartAt(
+                    Cursor.newBuilder()
+                        .setBefore(false)
+                        .addValues(
+                            Value.newBuilder()
+                                .setReferenceValue(runQueryResponse.getDocument().getName())
+                                .build()));
+      } else {
+        Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+        Map<String, Value> fieldsMap = runQueryResponse.getDocument().getFieldsMap();
+        for (Order order : orderByList) {
+          String fieldPath = order.getField().getFieldPath();
+          Value value = fieldsMap.get(fieldPath);
+          if (value != null) {
+            cursor.addValues(value);
+          } else if ("__name__".equals(fieldPath)) {
+            cursor.addValues(
+                Value.newBuilder()
+                    .setReferenceValue(runQueryResponse.getDocument().getName())
+                    .build());
+          }
+        }
+        builder = query.toBuilder().setStartAt(cursor.build());
+      }
+      return element.toBuilder().setStructuredQuery(builder.build()).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link PartitionQueryRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, all pages will be aggregated before being
+   * emitted to the next stage of the pipeline. Aggregation of pages is necessary as the next step
+   * of pairing of cursors to create N queries must first sort all cursors. See <a target="_blank"
+   * rel="noopener noreferrer"
+   * href="https://cloud.google.com/firestore/docs/reference/rest/v1/projects.databases.documents/partitionQuery#request-body">{@code
+   * pageToken}s</a> documentation for details.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class PartitionQueryFn
+      extends BaseFirestoreV1ReadFn<PartitionQueryRequest, PartitionQueryPair> {
+
+    public PartitionQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.PartitionQuery;
+    }
+
+    @Override
+    public void processElement(ProcessContext context) throws Exception {
+      @SuppressWarnings("nullness")
+      final PartitionQueryRequest element =
+          requireNonNull(context.element(), "c.element() must be non null");
+
+      RpcQos.RpcReadAttempt attempt = rpcQos.newReadAttempt(getRpcAttemptContext());
+      PartitionQueryResponse.Builder aggregate = null;
+      while (true) {
+        if (!attempt.awaitSafeToProceed(clock.instant())) {
+          continue;
+        }
+
+        try {
+          PartitionQueryRequest request = setPageToken(element, aggregate);
+          attempt.recordRequestStart(clock.instant());
+          PartitionQueryPagedResponse pagedResponse =
+              firestoreStub.partitionQueryPagedCallable().call(request);
+          for (PartitionQueryPage page : pagedResponse.iteratePages()) {
+            attempt.recordRequestSuccessful(clock.instant());
+            PartitionQueryResponse response = page.getResponse();
+            if (aggregate == null) {
+              aggregate = response.toBuilder();
+            } else {
+              aggregate.addAllPartitions(response.getPartitionsList());
+              if (page.hasNextPage()) {
+                aggregate.setNextPageToken(response.getNextPageToken());
+              } else {
+                aggregate.clearNextPageToken();
+              }
+            }
+            if (page.hasNextPage()) {
+              attempt.recordRequestStart(clock.instant());
+            }
+          }
+          attempt.completeSuccess();
+          break;
+        } catch (RuntimeException exception) {
+          Instant end = clock.instant();
+          attempt.recordRequestFailed(end);
+          attempt.checkCanRetry(end, exception);
+        }
+      }
+      if (aggregate != null) {
+        context.output(new PartitionQueryPair(element, aggregate.build()));
+      }
+    }
+
+    private PartitionQueryRequest setPageToken(
+        PartitionQueryRequest request,
+        @edu.umd.cs.findbugs.annotations.Nullable PartitionQueryResponse.Builder aggregate) {
+      if (aggregate != null && aggregate.getNextPageToken() != null) {
+        return request.toBuilder().setPageToken(aggregate.getNextPageToken()).build();
+      }
+      return request;
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link ListDocumentsRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, the response from each page will be output to
+   * the next stage of the pipeline.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class ListDocumentsFn
+      extends PaginatedFirestoreV1ReadFn<
+          ListDocumentsRequest,
+          ListDocumentsPagedResponse,
+          ListDocumentsPage,
+          ListDocumentsResponse> {
+
+    ListDocumentsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.ListDocuments;
+    }
+
+    @Override
+    protected UnaryCallable<ListDocumentsRequest, ListDocumentsPagedResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.listDocumentsPagedCallable();
+    }
+
+    @Override
+    protected ListDocumentsRequest setPageToken(
+        ListDocumentsRequest request, String nextPageToken) {
+      return request.toBuilder().setPageToken(nextPageToken).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link ListCollectionIdsRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, the response from each page will be output to
+   * the next stage of the pipeline.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class ListCollectionIdsFn
+      extends PaginatedFirestoreV1ReadFn<
+          ListCollectionIdsRequest,
+          ListCollectionIdsPagedResponse,
+          ListCollectionIdsPage,
+          ListCollectionIdsResponse> {
+
+    ListCollectionIdsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.ListCollectionIds;
+    }
+
+    @Override
+    protected UnaryCallable<ListCollectionIdsRequest, ListCollectionIdsPagedResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.listCollectionIdsPagedCallable();
+    }
+
+    @Override
+    protected ListCollectionIdsRequest setPageToken(
+        ListCollectionIdsRequest request, String nextPageToken) {
+      return request.toBuilder().setPageToken(nextPageToken).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link BatchGetDocumentsRequest}s.
+   *
+   * <p>This Fn uses a stream to obtain responses, each response from the stream will be output to
+   * the next stage of the pipeline. Each response from the stream represents an individual document
+   * with the associated metadata.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class BatchGetDocumentsFn
+      extends StreamingFirestoreV1ReadFn<BatchGetDocumentsRequest, BatchGetDocumentsResponse> {
+
+    BatchGetDocumentsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.BatchGetDocuments;
+    }
+
+    @Override
+    protected ServerStreamingCallable<BatchGetDocumentsRequest, BatchGetDocumentsResponse>
+        getCallable(FirestoreStub firestoreStub) {
+      return firestoreStub.batchGetDocumentsCallable();
+    }
+
+    @Override
+    protected BatchGetDocumentsRequest setStartFrom(
+        BatchGetDocumentsRequest originalRequest, BatchGetDocumentsResponse mostRecentResponse) {
+      int startIndex = -1;
+      ProtocolStringList documentsList = originalRequest.getDocumentsList();
+      String missing = mostRecentResponse.getMissing();
+      String foundName =
+          mostRecentResponse.hasFound() ? mostRecentResponse.getFound().getName() : null;
+      // we only scan until the second to last originalRequest. If the final element were to be
+      // reached
+      // the full request would be complete and we wouldn't be in this scenario
+      int maxIndex = documentsList.size() - 2;
+      for (int i = 0; i <= maxIndex; i++) {

Review comment:
       I don't know much about what the response will look like. But given that you skip everything up to `startIndex`, it seems safe to assume this is always sequential? If so, can you advance an object-level counter instead of iterating over all document names every time?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -700,6 +723,142 @@ public long nextBackOffMillis() {
     }
   }
 
+  /**
+   * This class implements a backoff algorithm similar to that of {@link
+   * org.apache.beam.sdk.util.FluentBackoff} with a could key differences:

Review comment:
       Typo

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/RpcQosImpl.java
##########
@@ -700,6 +723,142 @@ public long nextBackOffMillis() {
     }
   }
 
+  /**
+   * This class implements a backoff algorithm similar to that of {@link
+   * org.apache.beam.sdk.util.FluentBackoff} with a could key differences:
+   *
+   * <ol>
+   *   <li>A set of status code numbers may be specified to have a graceful evaluation
+   *   <li>Gracefully evaluated status code numbers will increment a decaying counter to ensure if
+   *       the graceful status code numbers occur more than once in the previous 60 seconds the
+   *       regular backoff behavior will kick in.
+   *   <li>The random number generator used to induce jitter is provided via constructor parameter
+   *       rather than using {@link Math#random()}}
+   * </ol>
+   *
+   * The primary motivation for creating this implementation is to support streamed responses from
+   * Firestore. In the case of RunQuery and BatchGet the results are returned via stream. The result
+   * stream has a maximum lifetime of 60 seconds before it will be broken and an UNAVAILABLE status
+   * code will be raised. Give this UNAVAILABLE is expected for streams this class allows for
+   * defining a set of status code numbers which are give a grace count of 1 before backoff kicks
+   * in. When backoff does kick in, it is implemented using the same calculations as {@link
+   * org.apache.beam.sdk.util.FluentBackoff}.
+   */
+  static final class StatusCodeAwareBackoff {
+    private static final double RANDOMIZATION_FACTOR = 0.5;

Review comment:
       This seems like a lot, does `FluentBackoff` have that much fuzziness as well?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java
##########
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.gax.paging.AbstractPage;
+import com.google.api.gax.paging.AbstractPagedListResponse;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPage;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPage;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.NonWindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each of the supported read
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1ReadFn {
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link RunQueryRequest}s.
+   *
+   * <p>This Fn uses a stream to obtain responses, each response from the stream will be output to
+   * the next stage of the pipeline. Each response from the stream represents an individual document
+   * with the associated metadata.
+   *
+   * <p>If an error is encountered while reading from the stream, the stream will attempt to resume
+   * rather than starting over. The restarting of the stream will continue within the scope of the
+   * completion of the request (meaning any possibility of resumption is contingent upon an attempt
+   * being available in the Qos budget).
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class RunQueryFn
+      extends StreamingFirestoreV1ReadFn<RunQueryRequest, RunQueryResponse> {
+
+    RunQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.RunQuery;
+    }
+
+    @Override
+    protected ServerStreamingCallable<RunQueryRequest, RunQueryResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.runQueryCallable();
+    }
+
+    @Override
+    protected RunQueryRequest setStartFrom(
+        RunQueryRequest element, RunQueryResponse runQueryResponse) {
+      StructuredQuery query = element.getStructuredQuery();
+      StructuredQuery.Builder builder;
+      List<Order> orderByList = query.getOrderByList();
+      // if the orderByList is empty that means the default sort of "__name__ ASC" will be used
+      // Before we can set the cursor to the last document name read, we need to explicitly add
+      // the order of "__name__ ASC" because a cursor value must map to an order by
+      if (orderByList.isEmpty()) {
+        builder =
+            query
+                .toBuilder()
+                .addOrderBy(
+                    Order.newBuilder()
+                        .setField(FieldReference.newBuilder().setFieldPath("__name__").build())
+                        .setDirection(Direction.ASCENDING)
+                        .build())
+                .setStartAt(
+                    Cursor.newBuilder()
+                        .setBefore(false)
+                        .addValues(
+                            Value.newBuilder()
+                                .setReferenceValue(runQueryResponse.getDocument().getName())
+                                .build()));
+      } else {
+        Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+        Map<String, Value> fieldsMap = runQueryResponse.getDocument().getFieldsMap();
+        for (Order order : orderByList) {
+          String fieldPath = order.getField().getFieldPath();
+          Value value = fieldsMap.get(fieldPath);
+          if (value != null) {
+            cursor.addValues(value);
+          } else if ("__name__".equals(fieldPath)) {
+            cursor.addValues(
+                Value.newBuilder()
+                    .setReferenceValue(runQueryResponse.getDocument().getName())
+                    .build());
+          }
+        }
+        builder = query.toBuilder().setStartAt(cursor.build());
+      }
+      return element.toBuilder().setStructuredQuery(builder.build()).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link PartitionQueryRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, all pages will be aggregated before being
+   * emitted to the next stage of the pipeline. Aggregation of pages is necessary as the next step
+   * of pairing of cursors to create N queries must first sort all cursors. See <a target="_blank"
+   * rel="noopener noreferrer"
+   * href="https://cloud.google.com/firestore/docs/reference/rest/v1/projects.databases.documents/partitionQuery#request-body">{@code
+   * pageToken}s</a> documentation for details.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class PartitionQueryFn
+      extends BaseFirestoreV1ReadFn<PartitionQueryRequest, PartitionQueryPair> {
+
+    public PartitionQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.PartitionQuery;
+    }
+
+    @Override
+    public void processElement(ProcessContext context) throws Exception {
+      @SuppressWarnings("nullness")
+      final PartitionQueryRequest element =
+          requireNonNull(context.element(), "c.element() must be non null");
+
+      RpcQos.RpcReadAttempt attempt = rpcQos.newReadAttempt(getRpcAttemptContext());
+      PartitionQueryResponse.Builder aggregate = null;
+      while (true) {
+        if (!attempt.awaitSafeToProceed(clock.instant())) {
+          continue;
+        }
+
+        try {
+          PartitionQueryRequest request = setPageToken(element, aggregate);
+          attempt.recordRequestStart(clock.instant());
+          PartitionQueryPagedResponse pagedResponse =
+              firestoreStub.partitionQueryPagedCallable().call(request);
+          for (PartitionQueryPage page : pagedResponse.iteratePages()) {
+            attempt.recordRequestSuccessful(clock.instant());
+            PartitionQueryResponse response = page.getResponse();
+            if (aggregate == null) {
+              aggregate = response.toBuilder();
+            } else {
+              aggregate.addAllPartitions(response.getPartitionsList());
+              if (page.hasNextPage()) {
+                aggregate.setNextPageToken(response.getNextPageToken());
+              } else {
+                aggregate.clearNextPageToken();
+              }
+            }
+            if (page.hasNextPage()) {
+              attempt.recordRequestStart(clock.instant());
+            }
+          }
+          attempt.completeSuccess();
+          break;
+        } catch (RuntimeException exception) {
+          Instant end = clock.instant();
+          attempt.recordRequestFailed(end);
+          attempt.checkCanRetry(end, exception);
+        }
+      }
+      if (aggregate != null) {
+        context.output(new PartitionQueryPair(element, aggregate.build()));
+      }
+    }
+
+    private PartitionQueryRequest setPageToken(
+        PartitionQueryRequest request,
+        @edu.umd.cs.findbugs.annotations.Nullable PartitionQueryResponse.Builder aggregate) {
+      if (aggregate != null && aggregate.getNextPageToken() != null) {
+        return request.toBuilder().setPageToken(aggregate.getNextPageToken()).build();
+      }
+      return request;
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link ListDocumentsRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, the response from each page will be output to
+   * the next stage of the pipeline.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class ListDocumentsFn
+      extends PaginatedFirestoreV1ReadFn<
+          ListDocumentsRequest,
+          ListDocumentsPagedResponse,
+          ListDocumentsPage,
+          ListDocumentsResponse> {
+
+    ListDocumentsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.ListDocuments;
+    }
+
+    @Override
+    protected UnaryCallable<ListDocumentsRequest, ListDocumentsPagedResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.listDocumentsPagedCallable();
+    }
+
+    @Override
+    protected ListDocumentsRequest setPageToken(
+        ListDocumentsRequest request, String nextPageToken) {
+      return request.toBuilder().setPageToken(nextPageToken).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link ListCollectionIdsRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, the response from each page will be output to
+   * the next stage of the pipeline.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class ListCollectionIdsFn
+      extends PaginatedFirestoreV1ReadFn<
+          ListCollectionIdsRequest,
+          ListCollectionIdsPagedResponse,
+          ListCollectionIdsPage,
+          ListCollectionIdsResponse> {
+
+    ListCollectionIdsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.ListCollectionIds;
+    }
+
+    @Override
+    protected UnaryCallable<ListCollectionIdsRequest, ListCollectionIdsPagedResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.listCollectionIdsPagedCallable();
+    }
+
+    @Override
+    protected ListCollectionIdsRequest setPageToken(
+        ListCollectionIdsRequest request, String nextPageToken) {
+      return request.toBuilder().setPageToken(nextPageToken).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link BatchGetDocumentsRequest}s.
+   *
+   * <p>This Fn uses a stream to obtain responses, each response from the stream will be output to
+   * the next stage of the pipeline. Each response from the stream represents an individual document
+   * with the associated metadata.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class BatchGetDocumentsFn
+      extends StreamingFirestoreV1ReadFn<BatchGetDocumentsRequest, BatchGetDocumentsResponse> {
+
+    BatchGetDocumentsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.BatchGetDocuments;
+    }
+
+    @Override
+    protected ServerStreamingCallable<BatchGetDocumentsRequest, BatchGetDocumentsResponse>
+        getCallable(FirestoreStub firestoreStub) {
+      return firestoreStub.batchGetDocumentsCallable();
+    }
+
+    @Override
+    protected BatchGetDocumentsRequest setStartFrom(
+        BatchGetDocumentsRequest originalRequest, BatchGetDocumentsResponse mostRecentResponse) {
+      int startIndex = -1;
+      ProtocolStringList documentsList = originalRequest.getDocumentsList();
+      String missing = mostRecentResponse.getMissing();
+      String foundName =
+          mostRecentResponse.hasFound() ? mostRecentResponse.getFound().getName() : null;
+      // we only scan until the second to last originalRequest. If the final element were to be
+      // reached
+      // the full request would be complete and we wouldn't be in this scenario
+      int maxIndex = documentsList.size() - 2;
+      for (int i = 0; i <= maxIndex; i++) {
+        String docName = documentsList.get(i);
+        if (docName.equals(missing) || docName.equals(foundName)) {
+          startIndex = i;
+          break;
+        }
+      }
+      if (0 <= startIndex) {
+        BatchGetDocumentsRequest.Builder builder = originalRequest.toBuilder().clearDocuments();
+        documentsList.stream()
+            .skip(startIndex + 1) // start from the next entry from the one we found
+            .forEach(builder::addDocuments);
+        return builder.build();
+      }
+      // unable to find a match, return the original request
+      return originalRequest;

Review comment:
       Will this only ever happen on the first response with the transaction field set? Maybe you can add a comment here to make this clearer. If there's any other way this could fail I prefer an exception.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java
##########
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.gax.paging.AbstractPage;
+import com.google.api.gax.paging.AbstractPagedListResponse;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPage;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPage;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.NonWindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each of the supported read
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1ReadFn {
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link RunQueryRequest}s.
+   *
+   * <p>This Fn uses a stream to obtain responses, each response from the stream will be output to
+   * the next stage of the pipeline. Each response from the stream represents an individual document
+   * with the associated metadata.
+   *
+   * <p>If an error is encountered while reading from the stream, the stream will attempt to resume
+   * rather than starting over. The restarting of the stream will continue within the scope of the
+   * completion of the request (meaning any possibility of resumption is contingent upon an attempt
+   * being available in the Qos budget).
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class RunQueryFn
+      extends StreamingFirestoreV1ReadFn<RunQueryRequest, RunQueryResponse> {
+
+    RunQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.RunQuery;
+    }
+
+    @Override
+    protected ServerStreamingCallable<RunQueryRequest, RunQueryResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.runQueryCallable();
+    }
+
+    @Override
+    protected RunQueryRequest setStartFrom(
+        RunQueryRequest element, RunQueryResponse runQueryResponse) {
+      StructuredQuery query = element.getStructuredQuery();
+      StructuredQuery.Builder builder;
+      List<Order> orderByList = query.getOrderByList();
+      // if the orderByList is empty that means the default sort of "__name__ ASC" will be used
+      // Before we can set the cursor to the last document name read, we need to explicitly add
+      // the order of "__name__ ASC" because a cursor value must map to an order by
+      if (orderByList.isEmpty()) {
+        builder =
+            query
+                .toBuilder()
+                .addOrderBy(
+                    Order.newBuilder()
+                        .setField(FieldReference.newBuilder().setFieldPath("__name__").build())
+                        .setDirection(Direction.ASCENDING)
+                        .build())
+                .setStartAt(
+                    Cursor.newBuilder()
+                        .setBefore(false)
+                        .addValues(
+                            Value.newBuilder()
+                                .setReferenceValue(runQueryResponse.getDocument().getName())
+                                .build()));
+      } else {
+        Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+        Map<String, Value> fieldsMap = runQueryResponse.getDocument().getFieldsMap();
+        for (Order order : orderByList) {
+          String fieldPath = order.getField().getFieldPath();
+          Value value = fieldsMap.get(fieldPath);
+          if (value != null) {
+            cursor.addValues(value);
+          } else if ("__name__".equals(fieldPath)) {
+            cursor.addValues(
+                Value.newBuilder()
+                    .setReferenceValue(runQueryResponse.getDocument().getName())
+                    .build());
+          }
+        }
+        builder = query.toBuilder().setStartAt(cursor.build());
+      }
+      return element.toBuilder().setStructuredQuery(builder.build()).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link PartitionQueryRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, all pages will be aggregated before being
+   * emitted to the next stage of the pipeline. Aggregation of pages is necessary as the next step
+   * of pairing of cursors to create N queries must first sort all cursors. See <a target="_blank"
+   * rel="noopener noreferrer"
+   * href="https://cloud.google.com/firestore/docs/reference/rest/v1/projects.databases.documents/partitionQuery#request-body">{@code
+   * pageToken}s</a> documentation for details.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class PartitionQueryFn
+      extends BaseFirestoreV1ReadFn<PartitionQueryRequest, PartitionQueryPair> {
+
+    public PartitionQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.PartitionQuery;
+    }
+
+    @Override
+    public void processElement(ProcessContext context) throws Exception {
+      @SuppressWarnings("nullness")
+      final PartitionQueryRequest element =
+          requireNonNull(context.element(), "c.element() must be non null");
+
+      RpcQos.RpcReadAttempt attempt = rpcQos.newReadAttempt(getRpcAttemptContext());
+      PartitionQueryResponse.Builder aggregate = null;
+      while (true) {
+        if (!attempt.awaitSafeToProceed(clock.instant())) {
+          continue;
+        }
+
+        try {
+          PartitionQueryRequest request = setPageToken(element, aggregate);
+          attempt.recordRequestStart(clock.instant());
+          PartitionQueryPagedResponse pagedResponse =
+              firestoreStub.partitionQueryPagedCallable().call(request);
+          for (PartitionQueryPage page : pagedResponse.iteratePages()) {
+            attempt.recordRequestSuccessful(clock.instant());
+            PartitionQueryResponse response = page.getResponse();
+            if (aggregate == null) {
+              aggregate = response.toBuilder();
+            } else {
+              aggregate.addAllPartitions(response.getPartitionsList());
+              if (page.hasNextPage()) {
+                aggregate.setNextPageToken(response.getNextPageToken());
+              } else {
+                aggregate.clearNextPageToken();
+              }
+            }
+            if (page.hasNextPage()) {
+              attempt.recordRequestStart(clock.instant());
+            }
+          }
+          attempt.completeSuccess();
+          break;
+        } catch (RuntimeException exception) {
+          Instant end = clock.instant();
+          attempt.recordRequestFailed(end);
+          attempt.checkCanRetry(end, exception);
+        }
+      }
+      if (aggregate != null) {
+        context.output(new PartitionQueryPair(element, aggregate.build()));
+      }
+    }
+
+    private PartitionQueryRequest setPageToken(
+        PartitionQueryRequest request,
+        @edu.umd.cs.findbugs.annotations.Nullable PartitionQueryResponse.Builder aggregate) {

Review comment:
       Wrong import

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1ReadFn.java
##########
@@ -0,0 +1,632 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.api.gax.paging.AbstractPage;
+import com.google.api.gax.paging.AbstractPagedListResponse;
+import com.google.api.gax.rpc.ServerStream;
+import com.google.api.gax.rpc.ServerStreamingCallable;
+import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPage;
+import com.google.cloud.firestore.v1.FirestoreClient.ListCollectionIdsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPage;
+import com.google.cloud.firestore.v1.FirestoreClient.ListDocumentsPagedResponse;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPage;
+import com.google.cloud.firestore.v1.FirestoreClient.PartitionQueryPagedResponse;
+import com.google.cloud.firestore.v1.stub.FirestoreStub;
+import com.google.firestore.v1.BatchGetDocumentsRequest;
+import com.google.firestore.v1.BatchGetDocumentsResponse;
+import com.google.firestore.v1.Cursor;
+import com.google.firestore.v1.ListCollectionIdsRequest;
+import com.google.firestore.v1.ListCollectionIdsResponse;
+import com.google.firestore.v1.ListDocumentsRequest;
+import com.google.firestore.v1.ListDocumentsResponse;
+import com.google.firestore.v1.PartitionQueryRequest;
+import com.google.firestore.v1.PartitionQueryResponse;
+import com.google.firestore.v1.RunQueryRequest;
+import com.google.firestore.v1.RunQueryResponse;
+import com.google.firestore.v1.StructuredQuery;
+import com.google.firestore.v1.StructuredQuery.Direction;
+import com.google.firestore.v1.StructuredQuery.FieldReference;
+import com.google.firestore.v1.StructuredQuery.Order;
+import com.google.firestore.v1.Value;
+import com.google.protobuf.Message;
+import com.google.protobuf.ProtocolStringList;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.NonWindowAwareDoFn;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1Fn.HasRpcAttemptContext;
+import org.apache.beam.sdk.io.gcp.firestore.RpcQos.RpcAttempt.Context;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A collection of {@link org.apache.beam.sdk.transforms.DoFn DoFn}s for each of the supported read
+ * RPC methods from the Cloud Firestore V1 API.
+ */
+final class FirestoreV1ReadFn {
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link RunQueryRequest}s.
+   *
+   * <p>This Fn uses a stream to obtain responses, each response from the stream will be output to
+   * the next stage of the pipeline. Each response from the stream represents an individual document
+   * with the associated metadata.
+   *
+   * <p>If an error is encountered while reading from the stream, the stream will attempt to resume
+   * rather than starting over. The restarting of the stream will continue within the scope of the
+   * completion of the request (meaning any possibility of resumption is contingent upon an attempt
+   * being available in the Qos budget).
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class RunQueryFn
+      extends StreamingFirestoreV1ReadFn<RunQueryRequest, RunQueryResponse> {
+
+    RunQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.RunQuery;
+    }
+
+    @Override
+    protected ServerStreamingCallable<RunQueryRequest, RunQueryResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.runQueryCallable();
+    }
+
+    @Override
+    protected RunQueryRequest setStartFrom(
+        RunQueryRequest element, RunQueryResponse runQueryResponse) {
+      StructuredQuery query = element.getStructuredQuery();
+      StructuredQuery.Builder builder;
+      List<Order> orderByList = query.getOrderByList();
+      // if the orderByList is empty that means the default sort of "__name__ ASC" will be used
+      // Before we can set the cursor to the last document name read, we need to explicitly add
+      // the order of "__name__ ASC" because a cursor value must map to an order by
+      if (orderByList.isEmpty()) {
+        builder =
+            query
+                .toBuilder()
+                .addOrderBy(
+                    Order.newBuilder()
+                        .setField(FieldReference.newBuilder().setFieldPath("__name__").build())
+                        .setDirection(Direction.ASCENDING)
+                        .build())
+                .setStartAt(
+                    Cursor.newBuilder()
+                        .setBefore(false)
+                        .addValues(
+                            Value.newBuilder()
+                                .setReferenceValue(runQueryResponse.getDocument().getName())
+                                .build()));
+      } else {
+        Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
+        Map<String, Value> fieldsMap = runQueryResponse.getDocument().getFieldsMap();
+        for (Order order : orderByList) {
+          String fieldPath = order.getField().getFieldPath();
+          Value value = fieldsMap.get(fieldPath);
+          if (value != null) {
+            cursor.addValues(value);
+          } else if ("__name__".equals(fieldPath)) {
+            cursor.addValues(
+                Value.newBuilder()
+                    .setReferenceValue(runQueryResponse.getDocument().getName())
+                    .build());
+          }
+        }
+        builder = query.toBuilder().setStartAt(cursor.build());
+      }
+      return element.toBuilder().setStructuredQuery(builder.build()).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link PartitionQueryRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, all pages will be aggregated before being
+   * emitted to the next stage of the pipeline. Aggregation of pages is necessary as the next step
+   * of pairing of cursors to create N queries must first sort all cursors. See <a target="_blank"
+   * rel="noopener noreferrer"
+   * href="https://cloud.google.com/firestore/docs/reference/rest/v1/projects.databases.documents/partitionQuery#request-body">{@code
+   * pageToken}s</a> documentation for details.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class PartitionQueryFn
+      extends BaseFirestoreV1ReadFn<PartitionQueryRequest, PartitionQueryPair> {
+
+    public PartitionQueryFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.PartitionQuery;
+    }
+
+    @Override
+    public void processElement(ProcessContext context) throws Exception {
+      @SuppressWarnings("nullness")
+      final PartitionQueryRequest element =
+          requireNonNull(context.element(), "c.element() must be non null");
+
+      RpcQos.RpcReadAttempt attempt = rpcQos.newReadAttempt(getRpcAttemptContext());
+      PartitionQueryResponse.Builder aggregate = null;
+      while (true) {
+        if (!attempt.awaitSafeToProceed(clock.instant())) {
+          continue;
+        }
+
+        try {
+          PartitionQueryRequest request = setPageToken(element, aggregate);
+          attempt.recordRequestStart(clock.instant());
+          PartitionQueryPagedResponse pagedResponse =
+              firestoreStub.partitionQueryPagedCallable().call(request);
+          for (PartitionQueryPage page : pagedResponse.iteratePages()) {
+            attempt.recordRequestSuccessful(clock.instant());
+            PartitionQueryResponse response = page.getResponse();
+            if (aggregate == null) {
+              aggregate = response.toBuilder();
+            } else {
+              aggregate.addAllPartitions(response.getPartitionsList());
+              if (page.hasNextPage()) {
+                aggregate.setNextPageToken(response.getNextPageToken());
+              } else {
+                aggregate.clearNextPageToken();
+              }
+            }
+            if (page.hasNextPage()) {
+              attempt.recordRequestStart(clock.instant());
+            }
+          }
+          attempt.completeSuccess();
+          break;
+        } catch (RuntimeException exception) {
+          Instant end = clock.instant();
+          attempt.recordRequestFailed(end);
+          attempt.checkCanRetry(end, exception);
+        }
+      }
+      if (aggregate != null) {
+        context.output(new PartitionQueryPair(element, aggregate.build()));
+      }
+    }
+
+    private PartitionQueryRequest setPageToken(
+        PartitionQueryRequest request,
+        @edu.umd.cs.findbugs.annotations.Nullable PartitionQueryResponse.Builder aggregate) {
+      if (aggregate != null && aggregate.getNextPageToken() != null) {
+        return request.toBuilder().setPageToken(aggregate.getNextPageToken()).build();
+      }
+      return request;
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link ListDocumentsRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, the response from each page will be output to
+   * the next stage of the pipeline.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class ListDocumentsFn
+      extends PaginatedFirestoreV1ReadFn<
+          ListDocumentsRequest,
+          ListDocumentsPagedResponse,
+          ListDocumentsPage,
+          ListDocumentsResponse> {
+
+    ListDocumentsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.ListDocuments;
+    }
+
+    @Override
+    protected UnaryCallable<ListDocumentsRequest, ListDocumentsPagedResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.listDocumentsPagedCallable();
+    }
+
+    @Override
+    protected ListDocumentsRequest setPageToken(
+        ListDocumentsRequest request, String nextPageToken) {
+      return request.toBuilder().setPageToken(nextPageToken).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link ListCollectionIdsRequest}s.
+   *
+   * <p>This Fn uses pagination to obtain responses, the response from each page will be output to
+   * the next stage of the pipeline.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class ListCollectionIdsFn
+      extends PaginatedFirestoreV1ReadFn<
+          ListCollectionIdsRequest,
+          ListCollectionIdsPagedResponse,
+          ListCollectionIdsPage,
+          ListCollectionIdsResponse> {
+
+    ListCollectionIdsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.ListCollectionIds;
+    }
+
+    @Override
+    protected UnaryCallable<ListCollectionIdsRequest, ListCollectionIdsPagedResponse> getCallable(
+        FirestoreStub firestoreStub) {
+      return firestoreStub.listCollectionIdsPagedCallable();
+    }
+
+    @Override
+    protected ListCollectionIdsRequest setPageToken(
+        ListCollectionIdsRequest request, String nextPageToken) {
+      return request.toBuilder().setPageToken(nextPageToken).build();
+    }
+  }
+
+  /**
+   * {@link DoFn} for Firestore V1 {@link BatchGetDocumentsRequest}s.
+   *
+   * <p>This Fn uses a stream to obtain responses, each response from the stream will be output to
+   * the next stage of the pipeline. Each response from the stream represents an individual document
+   * with the associated metadata.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   */
+  static final class BatchGetDocumentsFn
+      extends StreamingFirestoreV1ReadFn<BatchGetDocumentsRequest, BatchGetDocumentsResponse> {
+
+    BatchGetDocumentsFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public Context getRpcAttemptContext() {
+      return FirestoreV1Fn.V1FnRpcAttemptContext.BatchGetDocuments;
+    }
+
+    @Override
+    protected ServerStreamingCallable<BatchGetDocumentsRequest, BatchGetDocumentsResponse>
+        getCallable(FirestoreStub firestoreStub) {
+      return firestoreStub.batchGetDocumentsCallable();
+    }
+
+    @Override
+    protected BatchGetDocumentsRequest setStartFrom(
+        BatchGetDocumentsRequest originalRequest, BatchGetDocumentsResponse mostRecentResponse) {
+      int startIndex = -1;
+      ProtocolStringList documentsList = originalRequest.getDocumentsList();
+      String missing = mostRecentResponse.getMissing();
+      String foundName =
+          mostRecentResponse.hasFound() ? mostRecentResponse.getFound().getName() : null;
+      // we only scan until the second to last originalRequest. If the final element were to be
+      // reached
+      // the full request would be complete and we wouldn't be in this scenario
+      int maxIndex = documentsList.size() - 2;
+      for (int i = 0; i <= maxIndex; i++) {
+        String docName = documentsList.get(i);
+        if (docName.equals(missing) || docName.equals(foundName)) {
+          startIndex = i;
+          break;
+        }
+      }
+      if (0 <= startIndex) {
+        BatchGetDocumentsRequest.Builder builder = originalRequest.toBuilder().clearDocuments();
+        documentsList.stream()
+            .skip(startIndex + 1) // start from the next entry from the one we found
+            .forEach(builder::addDocuments);
+        return builder.build();
+      }
+      // unable to find a match, return the original request
+      return originalRequest;
+    }
+  }
+
+  /**
+   * {@link DoFn} Providing support for a Read type RPC operation which uses a Stream rather than
+   * pagination. Each response from the stream will be output to the next stage of the pipeline.
+   *
+   * <p>All request quality-of-service is managed via the instance of {@link RpcQos} associated with
+   * the lifecycle of this Fn.
+   *
+   * @param <InT> Request type
+   * @param <OutT> Response type
+   */
+  private abstract static class StreamingFirestoreV1ReadFn<
+          InT extends Message, OutT extends Message>
+      extends BaseFirestoreV1ReadFn<InT, OutT> {
+
+    protected StreamingFirestoreV1ReadFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    protected abstract ServerStreamingCallable<InT, OutT> getCallable(FirestoreStub firestoreStub);
+
+    protected abstract InT setStartFrom(InT element, OutT out);
+
+    @Override
+    public final void processElement(ProcessContext c) throws Exception {
+      @SuppressWarnings(
+          "nullness") // for some reason requireNonNull thinks its parameter but be non-null...
+      final InT element = requireNonNull(c.element(), "c.element() must be non null");
+
+      RpcQos.RpcReadAttempt attempt = rpcQos.newReadAttempt(getRpcAttemptContext());
+      OutT lastReceivedValue = null;
+      while (true) {
+        if (!attempt.awaitSafeToProceed(clock.instant())) {
+          continue;
+        }
+
+        Instant start = clock.instant();
+        try {
+          InT request =
+              lastReceivedValue == null ? element : setStartFrom(element, lastReceivedValue);
+          attempt.recordRequestStart(start);
+          ServerStream<OutT> serverStream = getCallable(firestoreStub).call(request);
+          attempt.recordRequestSuccessful(clock.instant());
+          for (OutT out : serverStream) {
+            lastReceivedValue = out;
+            attempt.recordStreamValue(clock.instant());
+            c.output(out);
+          }
+          attempt.completeSuccess();
+          break;
+        } catch (RuntimeException exception) {
+          Instant end = clock.instant();
+          attempt.recordRequestFailed(end);
+          attempt.checkCanRetry(end, exception);
+        }
+      }
+    }
+  }
+
+  /**
+   * {@link DoFn} Providing support for a Read type RPC operation which uses pagination rather than
+   * a Stream.
+   *
+   * @param <RequestT> Request type
+   * @param <ResponseT> Response type
+   */
+  @SuppressWarnings({
+    // errorchecker doesn't like the second ? on PagedResponse, seemingly because of different
+    // recursion depth limits; 3 on the found vs 4 on the required.
+    // The second ? is the type of collection the paged response uses to hold all responses if
+    // trying to expand all pages to a single collection. We are emitting a single page at at time
+    // while tracking read progress so we can resume if an error has occurred and we still have
+    // attempt budget available.
+    "type.argument.type.incompatible"
+  })
+  private abstract static class PaginatedFirestoreV1ReadFn<
+          RequestT extends Message,
+          PagedResponseT extends AbstractPagedListResponse<RequestT, ResponseT, ?, PageT, ?>,
+          PageT extends AbstractPage<RequestT, ResponseT, ?, PageT>,
+          ResponseT extends Message>
+      extends BaseFirestoreV1ReadFn<RequestT, ResponseT> {
+
+    protected PaginatedFirestoreV1ReadFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    protected abstract UnaryCallable<RequestT, PagedResponseT> getCallable(
+        FirestoreStub firestoreStub);
+
+    protected abstract RequestT setPageToken(RequestT request, String nextPageToken);
+
+    @Override
+    public final void processElement(ProcessContext c) throws Exception {
+      @SuppressWarnings(
+          "nullness") // for some reason requireNonNull thinks its parameter but be non-null...
+      final RequestT element = requireNonNull(c.element(), "c.element() must be non null");
+
+      RpcQos.RpcReadAttempt attempt = rpcQos.newReadAttempt(getRpcAttemptContext());
+      String nextPageToken = null;
+      while (true) {
+        if (!attempt.awaitSafeToProceed(clock.instant())) {
+          continue;
+        }
+
+        try {
+          RequestT request = nextPageToken == null ? element : setPageToken(element, nextPageToken);
+          attempt.recordRequestStart(clock.instant());
+          PagedResponseT pagedResponse = getCallable(firestoreStub).call(request);
+          for (PageT page : pagedResponse.iteratePages()) {
+            ResponseT response = page.getResponse();
+            attempt.recordRequestSuccessful(clock.instant());
+            c.output(response);
+            if (page.hasNextPage()) {
+              nextPageToken = page.getNextPageToken();
+              attempt.recordRequestStart(clock.instant());
+            }
+          }
+          attempt.completeSuccess();
+          break;
+        } catch (RuntimeException exception) {
+          Instant end = clock.instant();
+          attempt.recordRequestFailed(end);
+          attempt.checkCanRetry(end, exception);
+        }
+      }
+    }
+  }
+
+  /**
+   * Base class for all {@link org.apache.beam.sdk.transforms.DoFn DoFn}s which provide access to
+   * RPCs from the Cloud Firestore V1 API.
+   *
+   * <p>This class takes care of common lifecycle elements and transient state management for
+   * subclasses allowing subclasses to provide the minimal implementation for {@link
+   * NonWindowAwareDoFn#processElement(DoFn.ProcessContext)}}
+   *
+   * @param <InT> The type of element coming into this {@link DoFn}
+   * @param <OutT> The type of element output from this {@link DoFn}
+   */
+  abstract static class BaseFirestoreV1ReadFn<InT, OutT> extends NonWindowAwareDoFn<InT, OutT>
+      implements HasRpcAttemptContext {
+
+    protected final JodaClock clock;
+    protected final FirestoreStatefulComponentFactory firestoreStatefulComponentFactory;
+    protected final RpcQosOptions rpcQosOptions;
+
+    // transient running state information, not important to any possible checkpointing
+    protected transient FirestoreStub firestoreStub;
+    protected transient RpcQos rpcQos;
+    protected transient String projectId;
+
+    @SuppressWarnings(
+        "initialization.fields.uninitialized") // allow transient fields to be managed by component
+    // lifecycle
+    protected BaseFirestoreV1ReadFn(
+        JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory,
+        RpcQosOptions rpcQosOptions) {
+      this.clock = requireNonNull(clock, "clock must be non null");
+      this.firestoreStatefulComponentFactory =
+          requireNonNull(firestoreStatefulComponentFactory, "firestoreFactory must be non null");
+      this.rpcQosOptions = requireNonNull(rpcQosOptions, "rpcQosOptions must be non null");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void setup() {
+      rpcQos = firestoreStatefulComponentFactory.getRpcQos(rpcQosOptions);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public final void startBundle(StartBundleContext c) {
+      String project = c.getPipelineOptions().as(GcpOptions.class).getProject();
+      projectId =
+          requireNonNull(project, "project must be defined on GcpOptions of PipelineOptions");
+      firestoreStub = firestoreStatefulComponentFactory.getFirestoreStub(c.getPipelineOptions());
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("nullness") // allow clearing transient fields
+    @Override
+    public void finishBundle() throws Exception {
+      projectId = null;
+      firestoreStub.close();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public final void populateDisplayData(
+        @edu.umd.cs.findbugs.annotations.NonNull DisplayData.Builder builder) {

Review comment:
       Wrong import




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org