You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/29 18:44:52 UTC

[GitHub] [beam] chamikaramj commented on a change in pull request #14261: [BEAM-8376] Google Cloud Firestore Connector - Add Firestore V1 Write Operations

chamikaramj commented on a change in pull request #14261:
URL: https://github.com/apache/beam/pull/14261#discussion_r595689521



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreIO.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+/**
+ * {@link FirestoreIO} provides an API for reading from and writing to <a
+ * href="https://cloud.google.com/firestore/docs">Google Cloud Firestore</a>.
+ *
+ * <p>For documentation see {@link FirestoreV1}.
+ */
+public final class FirestoreIO {

Review comment:
       Please add an "@Experimental(Kind.SOURCE_SINK)" annotation [1] tag to this till the API stabilizes and the GCP support story for this connector is resolved.
   
   [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java#L57

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
##########
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.WriteResult;
+import com.google.rpc.Status;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.concurrent.Immutable;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BatchWriteFnWithDeadLetterQueue;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.DefaultBatchWriteFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * {@link FirestoreV1} provides an API which provides lifecycle managed {@link PTransform}s for
+ * <a href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1">Cloud
+ * Firestore v1 API</a>.
+ * <p/>
+ * This class is part of the Firestore Connector DSL and should be accessed via {@link
+ * FirestoreIO#v1()}.
+ * <p/>
+ * All {@link PTransform}s provided by this API use {@link org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+ * GcpOptions} on {@link org.apache.beam.sdk.options.PipelineOptions PipelineOptions} for
+ * credentials access and projectId resolution. As such, the lifecycle of gRPC clients and project
+ * information is scoped to the bundle level, not the worker level.
+ * <p/>
+ *
+ * <h3>Permissions</h3>
+ *
+ * Permission requirements depend on the {@code PipelineRunner} that is used to execute the
+ * pipeline. Please refer to the documentation of corresponding {@code PipelineRunner}s for more
+ * details.
+ *
+ * <p>Please see <a href="https://cloud.google.com/firestore/docs/quickstart-servers#create_a_in_native_mode_database">Create
+ * a Firestore in Native mode database
+ * </a>for security and permission related information specific to Cloud Firestore.
+ *
+ * <p>Optionally, Cloud Firestore V1 Emulator, running locally, could be used for testing purposes
+ * by providing the host port information vi {@link FirestoreIOOptions#setEmulatorHostPort(String)}.
+ * In such a case, all the Cloud Firestore API calls are directed to the Emulator.
+ *
+ * @see FirestoreIO#v1()
+ * @see org.apache.beam.sdk.PipelineRunner
+ * @see org.apache.beam.sdk.options.PipelineOptions
+ * @see org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+ * @see <a href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1">Cloud
+ * Firestore v1 API</a>
+ */
+@Immutable
+public final class FirestoreV1 {
+  static final FirestoreV1 INSTANCE = new FirestoreV1();
+
+  private FirestoreV1() {}
+
+  /**
+   * The class returned by this method provides the ability to create {@link PTransform PTransforms}
+   * for write operations available in the Firestore V1 API provided by {@link
+   * com.google.cloud.firestore.spi.v1.FirestoreRpc FirestoreRpc}.
+   * <p/>
+   * This method is part of the Firestore Connector DSL and should be accessed via {@link
+   * FirestoreIO#v1()}.
+   * <p/>
+   *
+   * @return Type safe builder factory for write operations.
+   * @see FirestoreIO#v1()
+   */
+  public final Write write() {
+    return Write.INSTANCE;
+  }
+
+  /**
+   * Type safe builder factory for write operations.
+   * <p/>
+   * This class is part of the Firestore Connector DSL and should be accessed via {@link #write()
+   * FirestoreIO.v1().write()}.
+   * <p/>
+   * <p/>
+   * This class provides access to a set of type safe builders for supported write operations
+   * available in the Firestore V1 API accessed through {@link com.google.cloud.firestore.spi.v1.FirestoreRpc
+   * FirestoreRpc}. Each builder allows configuration before creating an immutable instance which
+   * can be used in your pipeline.
+   *
+   * @see FirestoreIO#v1()
+   * @see #write()
+   */
+  @Immutable
+  public static final class Write {

Review comment:
       Please add @Experimental(Kind.SOURCE_SINK) to this transform.

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file

Review comment:
       It should be possible to trigger this IT from the PR to make sure it passes. Lemme know if you need help with this.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.WriteResult;
+import com.google.rpc.Status;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.concurrent.Immutable;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BatchWriteFnWithDeadLetterQueue;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.DefaultBatchWriteFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * {@link FirestoreV1} provides an API which provides lifecycle managed {@link PTransform}s for
+ * <a target="_blank" rel="noopener noreferrer" href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1">Cloud
+ * Firestore v1 API</a>.
+ * <p/>
+ * This class is part of the Firestore Connector DSL and should be accessed via {@link
+ * FirestoreIO#v1()}.
+ * <p/>
+ * All {@link PTransform}s provided by this API use {@link org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+ * GcpOptions} on {@link org.apache.beam.sdk.options.PipelineOptions PipelineOptions} for
+ * credentials access and projectId resolution. As such, the lifecycle of gRPC clients and project
+ * information is scoped to the bundle level, not the worker level.
+ * <p/>
+ *
+ * <h3>Operations</h3>
+ * <h4>Write</h4>
+ * To write a {@link PCollection} to Cloud Firestore use {@link FirestoreV1#write()}, picking the
+ * behavior of the writer.
+ *
+ * Writes use Cloud Firestore's BatchWrite api which provides fine grained write semantics.
+ *
+ * The default behavior is to fail a bundle if any single write fails with a non-retryable error.
+ * <pre>{@code
+ * PCollection<Write> writes = ...;
+ * PDone sink = writes
+ *     .apply(FirestoreIO.v1().write().batchWrite().build());
+ * }</pre>
+ *
+ * Alternatively, if you'd rather output write failures to a Dead Letter Queue add
+ * {@link BatchWrite.Builder#withDeadLetterQueue() withDeadLetterQueue} when building your writer.
+ * <pre>{@code
+ * PCollection<Write> writes = ...;

Review comment:
       One option might be to return a WriteResult with getters to get a regular result PCollection or a dead-letter queue. We have something like that for BigQuery sink.
   
   https://github.com/apache/beam/blob/0e47ea3a67acbb41b8f9357d722bbd565bc74105/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L1695

##########
File path: buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
##########
@@ -21,27 +21,22 @@ package org.apache.beam.gradle
 import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
 import groovy.json.JsonOutput
 import groovy.json.JsonSlurper
-import org.gradle.api.attributes.Category
 import org.gradle.api.GradleException
 import org.gradle.api.Plugin
 import org.gradle.api.Project
 import org.gradle.api.Task
 import org.gradle.api.artifacts.Configuration
 import org.gradle.api.artifacts.ProjectDependency
+import org.gradle.api.attributes.Category
 import org.gradle.api.file.FileCollection
 import org.gradle.api.file.FileTree
 import org.gradle.api.plugins.quality.Checkstyle
 import org.gradle.api.publish.maven.MavenPublication
-import org.gradle.api.tasks.Copy
-import org.gradle.api.tasks.Delete
-import org.gradle.api.tasks.Exec
-import org.gradle.api.tasks.JavaExec
+import org.gradle.api.tasks.*

Review comment:
       You might have to expand this to pass Beam lint tests.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreIOOptions.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import com.google.cloud.firestore.FirestoreOptions;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Description("Options used to configure Cloud Firestore IO")
+public interface FirestoreIOOptions extends PipelineOptions {

Review comment:
       Just 'FirestoreOptions'

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreDoFn.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+
+/**
+ * Base class for all {@link DoFn} defined in the Firestore Connector.
+ * <p/>
+ * This class defines all of the lifecycle events as abstract methods, ensuring each is accounted
+ * for in any implementing function.
+ * @param <In> The type of the previous stage of the pipeline
+ * @param <Out> The type output to the next stage of the pipeline
+ */
+abstract class FirestoreDoFn<In, Out> extends DoFn<In, Out> {
+
+  @Override
+  public abstract void populateDisplayData(DisplayData.Builder builder);
+
+  /**
+   * @see org.apache.beam.sdk.transforms.DoFn.Setup
+   */
+  @Setup
+  public abstract void setup() throws Exception;
+
+  /**
+   * @see org.apache.beam.sdk.transforms.DoFn.StartBundle
+   */
+  @StartBundle
+  public abstract void startBundle(DoFn<In, Out>.StartBundleContext context) throws Exception;

Review comment:
       I think it will be fine to just remove FirestoreDoFn and add these annotations to actual DoFn implementations.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreDoFn.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+
+/**
+ * Base class for all {@link DoFn} defined in the Firestore Connector.

Review comment:
       Are you sure this will be the base class for all DoFns used by source and sink ? A DoFn can include any generic functionality and many sources/sinks break up functionality into multiple DoFns due to various reasons (readability, modularity, persistence of input, etc.)

##########
File path: buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
##########
@@ -425,6 +420,7 @@ class BeamModulePlugin implements Plugin<Project> {
     def errorprone_version = "2.3.4"
     def google_clients_version = "1.31.0"
     def google_cloud_bigdataoss_version = "2.1.6"
+    def google_cloud_firestore_version = "2.2.4"

Review comment:
       Is it possible to depend on the GCP BOM instead of adding a separate dependency ? That's highly preferable since that will allow GCP users to maintain a consistent environment without running into diamond dependency conflicts. We currently depend on com.google.cloud:libraries-bom:16.3.0. Not sure if you'll have to upgrade this or not.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.WriteResult;
+import com.google.rpc.Status;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.concurrent.Immutable;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BatchWriteFnWithDeadLetterQueue;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.DefaultBatchWriteFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * {@link FirestoreV1} provides an API which provides lifecycle managed {@link PTransform}s for
+ * <a target="_blank" rel="noopener noreferrer" href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1">Cloud
+ * Firestore v1 API</a>.
+ * <p/>
+ * This class is part of the Firestore Connector DSL and should be accessed via {@link
+ * FirestoreIO#v1()}.
+ * <p/>
+ * All {@link PTransform}s provided by this API use {@link org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+ * GcpOptions} on {@link org.apache.beam.sdk.options.PipelineOptions PipelineOptions} for
+ * credentials access and projectId resolution. As such, the lifecycle of gRPC clients and project
+ * information is scoped to the bundle level, not the worker level.
+ * <p/>
+ *
+ * <h3>Operations</h3>
+ * <h4>Write</h4>
+ * To write a {@link PCollection} to Cloud Firestore use {@link FirestoreV1#write()}, picking the
+ * behavior of the writer.
+ *
+ * Writes use Cloud Firestore's BatchWrite api which provides fine grained write semantics.
+ *
+ * The default behavior is to fail a bundle if any single write fails with a non-retryable error.
+ * <pre>{@code
+ * PCollection<Write> writes = ...;
+ * PDone sink = writes

Review comment:
       Returning PDone from a sink is an anti-pattern since users will not be able to continue the pipeline (for example, write to system X after writing to Firestore). We should think of some useful PCollection that we can return from the Firestore sync. If nothing else, this can just be 'null' values (per-window) or the original input PCollection.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.WriteResult;
+import com.google.rpc.Status;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.concurrent.Immutable;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BatchWriteFnWithDeadLetterQueue;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.DefaultBatchWriteFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * {@link FirestoreV1} provides an API which provides lifecycle managed {@link PTransform}s for
+ * <a target="_blank" rel="noopener noreferrer" href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1">Cloud
+ * Firestore v1 API</a>.
+ * <p/>
+ * This class is part of the Firestore Connector DSL and should be accessed via {@link
+ * FirestoreIO#v1()}.
+ * <p/>
+ * All {@link PTransform}s provided by this API use {@link org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+ * GcpOptions} on {@link org.apache.beam.sdk.options.PipelineOptions PipelineOptions} for
+ * credentials access and projectId resolution. As such, the lifecycle of gRPC clients and project
+ * information is scoped to the bundle level, not the worker level.
+ * <p/>
+ *
+ * <h3>Operations</h3>
+ * <h4>Write</h4>
+ * To write a {@link PCollection} to Cloud Firestore use {@link FirestoreV1#write()}, picking the
+ * behavior of the writer.
+ *
+ * Writes use Cloud Firestore's BatchWrite api which provides fine grained write semantics.
+ *
+ * The default behavior is to fail a bundle if any single write fails with a non-retryable error.
+ * <pre>{@code
+ * PCollection<Write> writes = ...;
+ * PDone sink = writes
+ *     .apply(FirestoreIO.v1().write().batchWrite().build());
+ * }</pre>
+ *
+ * Alternatively, if you'd rather output write failures to a Dead Letter Queue add
+ * {@link BatchWrite.Builder#withDeadLetterQueue() withDeadLetterQueue} when building your writer.
+ * <pre>{@code
+ * PCollection<Write> writes = ...;
+ * PCollection<WriteFailure> writeFailures = writes
+ *     .apply(FirestoreIO.v1().write().batchWrite().withDeadLetterQueue().build());
+ * }</pre>
+ *
+ * <h3>Permissions</h3>
+ *
+ * Permission requirements depend on the {@code PipelineRunner} that is used to execute the
+ * pipeline. Please refer to the documentation of corresponding {@code PipelineRunner}s for more
+ * details.
+ *
+ * <p>Please see <a target="_blank" rel="noopener noreferrer" href="https://cloud.google.com/firestore/docs/quickstart-servers#create_a_in_native_mode_database">Create
+ * a Firestore in Native mode database
+ * </a>for security and permission related information specific to Cloud Firestore.
+ *
+ * <p>Optionally, Cloud Firestore V1 Emulator, running locally, could be used for testing purposes
+ * by providing the host port information vi {@link FirestoreIOOptions#setEmulatorHost(String)}.
+ * In such a case, all the Cloud Firestore API calls are directed to the Emulator.
+ *
+ * @see FirestoreIO#v1()
+ * @see org.apache.beam.sdk.PipelineRunner
+ * @see org.apache.beam.sdk.options.PipelineOptions
+ * @see org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+ * @see <a target="_blank" rel="noopener noreferrer" href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1">Cloud
+ * Firestore v1 API</a>
+ */
+@Immutable
+public final class FirestoreV1 {
+  static final FirestoreV1 INSTANCE = new FirestoreV1();
+
+  private FirestoreV1() {}
+
+  /**
+   * The class returned by this method provides the ability to create {@link PTransform PTransforms}
+   * for write operations available in the Firestore V1 API provided by {@link
+   * com.google.cloud.firestore.spi.v1.FirestoreRpc FirestoreRpc}.
+   * <p/>
+   * This method is part of the Firestore Connector DSL and should be accessed via {@link
+   * FirestoreIO#v1()}.
+   * <p/>
+   *
+   * @return Type safe builder factory for write operations.
+   * @see FirestoreIO#v1()
+   */
+  public final Write write() {
+    return Write.INSTANCE;
+  }
+
+  /**
+   * Type safe builder factory for write operations.
+   * <p/>
+   * This class is part of the Firestore Connector DSL and should be accessed via {@link #write()
+   * FirestoreIO.v1().write()}.
+   * <p/>
+   * <p/>
+   * This class provides access to a set of type safe builders for supported write operations
+   * available in the Firestore V1 API accessed through {@link com.google.cloud.firestore.spi.v1.FirestoreRpc
+   * FirestoreRpc}. Each builder allows configuration before creating an immutable instance which
+   * can be used in your pipeline.
+   *
+   * @see FirestoreIO#v1()
+   * @see #write()
+   */
+  @Immutable
+  public static final class Write {
+    private static final Write INSTANCE = new Write();
+
+    private Write() {}
+
+    /**
+     * Factory method to create a new type safe builder for {@link com.google.firestore.v1.Write}
+     * operations.
+     * <p/>
+     * By default, when an error is encountered while trying to write to Cloud Firestore a {@link
+     * FailedWritesException} will be thrown. If you would like a failed write to not result in a
+     * {@link FailedWritesException}, you can instead use {@link BatchWriteWithDeadLetterQueue} which
+     * will output any failed write. {@link BatchWriteWithDeadLetterQueue} can be used by
+     * including {@link BatchWrite.Builder#withDeadLetterQueue()} when constructing the write handler.
+     * <p/>
+     * This method is part of the Firestore Connector DSL and should be accessed via {@link
+     * FirestoreIO#v1()}.
+     * <p/>
+     *
+     * All request quality-of-service for the built {@link BatchWrite} PTransform is scoped to
+     * the worker and configured based on the {@link RpcQosOptions} specified via this builder.
+     *
+     * @return A new type safe builder providing configuration for processing of {@link
+     * com.google.firestore.v1.Write}s
+     * @see FirestoreIO#v1()
+     * @see BatchWrite
+     * @see BatchWriteRequest
+     * @see com.google.firestore.v1.BatchWriteResponse
+     * @see <a target="_blank" rel="noopener noreferrer" href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchWrite">google.firestore.v1.Firestore.BatchWrite</a>
+     * @see <a target="_blank" rel="noopener noreferrer" href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteRequest">google.firestore.v1.BatchWriteRequest</a>
+     * @see <a target="_blank" rel="noopener noreferrer" href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteResponse">google.firestore.v1.BatchWriteResponse</a>
+     */
+    public BatchWrite.Builder batchWrite() {
+      return new BatchWrite.Builder();
+    }
+  }
+
+  /**
+   * Concrete class representing a {@link PTransform}{@code <}{@link PCollection}{@code <}{@link
+   * com.google.firestore.v1.Write}{@code >, }{@link PDone}{@code >} which will write to Firestore.
+   * <p/>
+   * If an error is encountered while trying to write to Cloud Firestore a {@link
+   * FailedWritesException} will be thrown. If you would like a failed write to not result in a
+   * {@link FailedWritesException}, you can instead use {@link BatchWriteWithDeadLetterQueue} which
+   * will instead output any failed write. {@link BatchWriteWithDeadLetterQueue } can be used by
+   * including {@link Builder#withDeadLetterQueue()} when constructing the write handler.
+   * <p/>
+   * This class is part of the Firestore Connector DSL, it has a type safe builder accessible via
+   * {@link FirestoreIO#v1()}{@code .}{@link FirestoreV1#write() write()}{@code .}{@link
+   * FirestoreV1.Write#batchWrite() batchWrite()}.
+   * <p/>
+   * All request quality-of-service for an instance of this PTransform is scoped to the worker and
+   * configured via {@link Builder#withRpcQosOptions(RpcQosOptions)}.
+   * <p/>
+   * Writes performed against Firestore will be ordered and grouped to maximize throughput while
+   * maintaining a high request success rate. Batch sizes will be determined by the QOS layer.
+   *
+   * @see FirestoreIO#v1()
+   * @see FirestoreV1#write()
+   * @see FirestoreV1.Write#batchWrite()
+   * @see BatchWrite.Builder
+   * @see BatchWriteWithDeadLetterQueue
+   * @see BatchWriteRequest
+   * @see com.google.firestore.v1.BatchWriteResponse
+   * @see <a target="_blank" rel="noopener noreferrer" href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.Firestore.BatchWrite">google.firestore.v1.Firestore.BatchWrite</a>
+   * @see <a target="_blank" rel="noopener noreferrer" href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteRequest">google.firestore.v1.BatchWriteRequest</a>
+   * @see <a target="_blank" rel="noopener noreferrer" href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1#google.firestore.v1.BatchWriteResponse">google.firestore.v1.BatchWriteResponse</a>
+   */
+  public static final class BatchWrite
+      extends Transform<
+      PCollection<com.google.firestore.v1.Write>,
+      PDone,
+      BatchWrite,
+      BatchWrite.Builder
+      > {
+
+    private BatchWrite(JodaClock clock,
+        FirestoreStatefulComponentFactory firestoreStatefulComponentFactory, RpcQosOptions rpcQosOptions) {
+      super(clock, firestoreStatefulComponentFactory, rpcQosOptions);
+    }
+
+    @Override
+    public PDone expand(PCollection<com.google.firestore.v1.Write> input) {
+      input.apply("batchWrite", ParDo.of(new DefaultBatchWriteFn(clock, firestoreStatefulComponentFactory, rpcQosOptions, CounterFactory.DEFAULT)));

Review comment:
       Are writes to Firestore idempotent ? Note that bundles of this write step may fail and may be retried by the runner. So if the writes are not idempotent you'll have to handle this to prevent duplicate data from being written.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java
##########
@@ -0,0 +1,620 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.firestore;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.firestore.v1.BatchWriteRequest;
+import com.google.firestore.v1.WriteResult;
+import com.google.rpc.Status;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.concurrent.Immutable;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.BatchWriteFnWithDeadLetterQueue;
+import org.apache.beam.sdk.io.gcp.firestore.FirestoreV1WriteFn.DefaultBatchWriteFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * {@link FirestoreV1} provides an API which provides lifecycle managed {@link PTransform}s for
+ * <a target="_blank" rel="noopener noreferrer" href="https://cloud.google.com/firestore/docs/reference/rpc/google.firestore.v1">Cloud
+ * Firestore v1 API</a>.
+ * <p/>
+ * This class is part of the Firestore Connector DSL and should be accessed via {@link
+ * FirestoreIO#v1()}.
+ * <p/>
+ * All {@link PTransform}s provided by this API use {@link org.apache.beam.sdk.extensions.gcp.options.GcpOptions
+ * GcpOptions} on {@link org.apache.beam.sdk.options.PipelineOptions PipelineOptions} for
+ * credentials access and projectId resolution. As such, the lifecycle of gRPC clients and project
+ * information is scoped to the bundle level, not the worker level.
+ * <p/>
+ *
+ * <h3>Operations</h3>
+ * <h4>Write</h4>
+ * To write a {@link PCollection} to Cloud Firestore use {@link FirestoreV1#write()}, picking the
+ * behavior of the writer.
+ *
+ * Writes use Cloud Firestore's BatchWrite api which provides fine grained write semantics.
+ *
+ * The default behavior is to fail a bundle if any single write fails with a non-retryable error.
+ * <pre>{@code
+ * PCollection<Write> writes = ...;
+ * PDone sink = writes
+ *     .apply(FirestoreIO.v1().write().batchWrite().build());

Review comment:
       Please make sure that these examples show any required parameters. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org