You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by go...@apache.org on 2022/05/16 18:02:46 UTC

[beam] branch master updated: [BEAM-14035 ] Implement BigQuerySchema Read/Write TransformProvider (#17607)

This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ee5888dbf5f [BEAM-14035 ] Implement BigQuerySchema Read/Write TransformProvider (#17607)
ee5888dbf5f is described below

commit ee5888dbf5f14ce7c427b41b5884e342f5e971c7
Author: Damon Douglas <da...@users.noreply.github.com>
AuthorDate: Mon May 16 18:02:39 2022 +0000

    [BEAM-14035 ] Implement BigQuerySchema Read/Write TransformProvider (#17607)
---
 .../transforms/TypedSchemaTransformProvider.java   |   4 +-
 .../TypedSchemaTransformProviderTest.java          |   2 +-
 .../BigQuerySchemaTransformReadConfiguration.java  | 100 ++++++++
 .../BigQuerySchemaTransformReadProvider.java       | 177 +++++++++++++
 .../BigQuerySchemaTransformWriteConfiguration.java |  86 +++++++
 .../BigQuerySchemaTransformWriteProvider.java      | 279 +++++++++++++++++++++
 .../BigQuerySchemaTransformReadProviderTest.java   | 252 +++++++++++++++++++
 .../BigQuerySchemaTransformWriteProviderTest.java  | 256 +++++++++++++++++++
 8 files changed, 1153 insertions(+), 3 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
index c237d03e052..944f33a596d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
@@ -42,13 +42,13 @@ import org.apache.beam.sdk.values.Row;
 @Experimental(Kind.SCHEMAS)
 public abstract class TypedSchemaTransformProvider<ConfigT> implements SchemaTransformProvider {
 
-  abstract Class<ConfigT> configurationClass();
+  protected abstract Class<ConfigT> configurationClass();
 
   /**
    * Produce a SchemaTransform from ConfigT. Can throw a {@link InvalidConfigurationException} or a
    * {@link InvalidSchemaException}.
    */
-  abstract SchemaTransform from(ConfigT configuration);
+  protected abstract SchemaTransform from(ConfigT configuration);
 
   /**
    * List the dependencies needed for this transform. Jars from classpath are used by default when
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
index e4018d3b6c1..744b4f3bf0b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProviderTest.java
@@ -63,7 +63,7 @@ public class TypedSchemaTransformProviderTest {
     }
 
     @Override
-    Class<Configuration> configurationClass() {
+    protected Class<Configuration> configurationClass() {
       return Configuration.class;
     }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
new file mode 100644
index 00000000000..f964a87be16
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * Configuration for reading from BigQuery.
+ *
+ * <p>This class is meant to be used with {@link BigQuerySchemaTransformReadProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaTransformReadConfiguration {
+
+  /** Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder}. */
+  public static Builder builder() {
+    return new AutoValue_BigQuerySchemaTransformReadConfiguration.Builder();
+  }
+
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
+  private static final TypeDescriptor<BigQuerySchemaTransformReadConfiguration> TYPE_DESCRIPTOR =
+      TypeDescriptor.of(BigQuerySchemaTransformReadConfiguration.class);
+  private static final SerializableFunction<BigQuerySchemaTransformReadConfiguration, Row>
+      ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+  /** Serializes configuration to a {@link Row}. */
+  Row toBeamRow() {
+    return ROW_SERIALIZABLE_FUNCTION.apply(this);
+  }
+
+  /** Configures the BigQuery read job with the SQL query. */
+  @Nullable
+  public abstract String getQuery();
+
+  /**
+   * Specifies a table for a BigQuery read job. See {@link BigQueryIO.TypedRead#from(String)} for
+   * more details on the expected format.
+   */
+  @Nullable
+  public abstract String getTableSpec();
+
+  /** BigQuery geographic location where the query job will be executed. */
+  @Nullable
+  public abstract String getQueryLocation();
+
+  /** Enables BigQuery's Standard SQL dialect when reading from a query. */
+  @Nullable
+  public abstract Boolean getUseStandardSql();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+    /** Configures the BigQuery read job with the SQL query. */
+    public abstract Builder setQuery(String value);
+
+    /**
+     * Specifies a table for a BigQuery read job. See {@link BigQueryIO.TypedRead#from(String)} for
+     * more details on the expected format.
+     */
+    public abstract Builder setTableSpec(String value);
+
+    /** BigQuery geographic location where the query job will be executed. */
+    public abstract Builder setQueryLocation(String value);
+
+    /** Enables BigQuery's Standard SQL dialect when reading from a query. */
+    public abstract Builder setUseStandardSql(Boolean value);
+
+    /** Builds the {@link BigQuerySchemaTransformReadConfiguration} configuration. */
+    public abstract BigQuerySchemaTransformReadConfiguration build();
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
new file mode 100644
index 00000000000..3ffc4dfa1c0
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:read", API);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  private static class BigQueryReadSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQueryReadSchemaTransform(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    /** An instance of {@link BigQueryServices} used for testing. */
+    private BigQueryServices testBigQueryServices = null;
+
+    PCollectionRowTupleTransform(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @VisibleForTesting
+    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+      this.testBigQueryServices = testBigQueryServices;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+
+      BigQueryIO.TypedRead<TableRow> read = toTypedRead();
+      if (testBigQueryServices != null) {
+        read = read.withTestServices(testBigQueryServices).withoutValidation();
+      }
+
+      PCollection<TableRow> tableRowPCollection = input.getPipeline().apply(read);
+      Schema schema = tableRowPCollection.getSchema();
+      PCollection<Row> rowPCollection =
+          tableRowPCollection.apply(
+              MapElements.into(TypeDescriptor.of(Row.class))
+                  .via((tableRow) -> BigQueryUtils.toBeamRow(schema, tableRow)));
+      return PCollectionRowTuple.of(OUTPUT_TAG, rowPCollection.setRowSchema(schema));
+    }
+
+    BigQueryIO.TypedRead<TableRow> toTypedRead() {
+      BigQueryIO.TypedRead<TableRow> read = BigQueryIO.readTableRowsWithSchema();
+
+      if (!Strings.isNullOrEmpty(configuration.getQuery())) {
+        read = read.fromQuery(configuration.getQuery());
+      }
+
+      if (!Strings.isNullOrEmpty(configuration.getTableSpec())) {
+        read = read.from(configuration.getTableSpec());
+      }
+
+      if (configuration.getUseStandardSql() != null && configuration.getUseStandardSql()) {
+        read = read.usingStandardSql();
+      }
+
+      if (!Strings.isNullOrEmpty(configuration.getQueryLocation())) {
+        read = read.withQueryLocation(configuration.getQueryLocation());
+      }
+
+      return read;
+    }
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteConfiguration.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteConfiguration.java
new file mode 100644
index 00000000000..5cbea3c49f0
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteConfiguration.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * Configuration for writing to BigQuery.
+ *
+ * <p>This class is meant to be used with {@link BigQuerySchemaTransformWriteProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaTransformWriteConfiguration {
+
+  /** Instantiates a {@link BigQuerySchemaTransformWriteConfiguration.Builder}. */
+  public static Builder builder() {
+    return new AutoValue_BigQuerySchemaTransformWriteConfiguration.Builder();
+  }
+
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
+  private static final TypeDescriptor<BigQuerySchemaTransformWriteConfiguration> TYPE_DESCRIPTOR =
+      TypeDescriptor.of(BigQuerySchemaTransformWriteConfiguration.class);
+  private static final SerializableFunction<BigQuerySchemaTransformWriteConfiguration, Row>
+      ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+  /**
+   * Writes to the given table specification. See {@link BigQueryIO.Write#to(String)}} for the
+   * expected format.
+   */
+  public abstract String getTableSpec();
+
+  /** Specifies whether the table should be created if it does not exist. */
+  public abstract String getCreateDisposition();
+
+  /** Specifies what to do with existing data in the table, in case the table already exists. */
+  public abstract String getWriteDisposition();
+
+  /** Serializes configuration to a {@link Row}. */
+  Row toBeamRow() {
+    return ROW_SERIALIZABLE_FUNCTION.apply(this);
+  }
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+    /**
+     * Writes to the given table specification. See {@link BigQueryIO.Write#to(String)}} for the
+     * expected format.
+     */
+    public abstract Builder setTableSpec(String value);
+
+    /** Specifies whether the table should be created if it does not exist. */
+    public abstract Builder setCreateDisposition(String value);
+
+    /** Specifies what to do with existing data in the table, in case the table already exists. */
+    public abstract Builder setWriteDisposition(String value);
+
+    /** Builds the {@link BigQuerySchemaTransformWriteConfiguration} configuration. */
+    public abstract BigQuerySchemaTransformWriteConfiguration build();
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java
new file mode 100644
index 00000000000..bd8d6adbd1a
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery write jobs configured
+ * using {@link BigQuerySchemaTransformWriteConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformWriteConfiguration> {
+
+  private static final String API = "bigquery";
+  static final String INPUT_TAG = "INPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformWriteConfiguration> configurationClass() {
+    return BigQuerySchemaTransformWriteConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformWriteConfiguration configuration) {
+    return new BigQueryWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:write", API);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since a
+   * single is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * no output is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * A {@link SchemaTransform} that performs {@link BigQueryIO.Write}s based on a {@link
+   * BigQuerySchemaTransformWriteConfiguration}.
+   */
+  private static class BigQueryWriteSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformWriteConfiguration configuration;
+
+    BigQueryWriteSchemaTransform(BigQuerySchemaTransformWriteConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    /**
+     * Overrides {@link SchemaTransform#buildTransform()} by returning a {@link
+     * PCollectionRowTupleTransform}.
+     */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery write jobs configured using {@link
+   * BigQuerySchemaTransformWriteConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformWriteConfiguration configuration;
+
+    /** An instance of {@link BigQueryServices} used for testing. */
+    private BigQueryServices testBigQueryServices = null;
+
+    PCollectionRowTupleTransform(BigQuerySchemaTransformWriteConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public void validate(PipelineOptions options) {
+      if (!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name())) {
+        return;
+      }
+
+      BigQueryOptions bigQueryOptions = options.as(BigQueryOptions.class);
+
+      BigQueryServices bigQueryServices = new BigQueryServicesImpl();
+      if (testBigQueryServices != null) {
+        bigQueryServices = testBigQueryServices;
+      }
+
+      DatasetService datasetService = bigQueryServices.getDatasetService(bigQueryOptions);
+      TableReference tableReference = BigQueryUtils.toTableReference(configuration.getTableSpec());
+
+      try {
+        Table table = datasetService.getTable(tableReference);
+        if (table == null) {
+          throw new NullPointerException();
+        }
+
+        if (table.getSchema() == null) {
+          throw new InvalidConfigurationException(
+              String.format("could not fetch schema for table: %s", configuration.getTableSpec()));
+        }
+
+      } catch (NullPointerException | InterruptedException | IOException ex) {
+        throw new InvalidConfigurationException(
+            String.format(
+                "could not fetch table %s, error: %s",
+                configuration.getTableSpec(), ex.getMessage()));
+      }
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      validate(input);
+      PCollection<Row> rowPCollection = input.get(INPUT_TAG);
+      Schema schema = rowPCollection.getSchema();
+      BigQueryIO.Write<TableRow> write = toWrite(schema);
+      if (testBigQueryServices != null) {
+        write = write.withTestServices(testBigQueryServices);
+      }
+
+      PCollection<TableRow> tableRowPCollection =
+          rowPCollection.apply(
+              MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow));
+      tableRowPCollection.apply(write);
+      return PCollectionRowTuple.empty(input.getPipeline());
+    }
+
+    /** Instantiates a {@link BigQueryIO.Write<TableRow>} from a {@link Schema}. */
+    BigQueryIO.Write<TableRow> toWrite(Schema schema) {
+      TableSchema tableSchema = BigQueryUtils.toTableSchema(schema);
+      CreateDisposition createDisposition =
+          CreateDisposition.valueOf(configuration.getCreateDisposition());
+      WriteDisposition writeDisposition =
+          WriteDisposition.valueOf(configuration.getWriteDisposition());
+
+      return BigQueryIO.writeTableRows()
+          .to(configuration.getTableSpec())
+          .withCreateDisposition(createDisposition)
+          .withWriteDisposition(writeDisposition)
+          .withSchema(tableSchema);
+    }
+
+    /** Setter for testing using {@link BigQueryServices}. */
+    @VisibleForTesting
+    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+      this.testBigQueryServices = testBigQueryServices;
+    }
+
+    /** Validate a {@link PCollectionRowTuple} input. */
+    void validate(PCollectionRowTuple input) {
+      if (!input.has(INPUT_TAG)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s is missing expected tag: %s",
+                getClass().getSimpleName(), input.getClass().getSimpleName(), INPUT_TAG));
+      }
+
+      PCollection<Row> rowInput = input.get(INPUT_TAG);
+      Schema sourceSchema = rowInput.getSchema();
+
+      if (sourceSchema == null) {
+        throw new IllegalArgumentException(
+            String.format("%s is null for input of tag: %s", Schema.class, INPUT_TAG));
+      }
+
+      if (!configuration.getCreateDisposition().equals(CreateDisposition.CREATE_NEVER.name())) {
+        return;
+      }
+
+      BigQueryOptions bigQueryOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+
+      BigQueryServices bigQueryServices = new BigQueryServicesImpl();
+      if (testBigQueryServices != null) {
+        bigQueryServices = testBigQueryServices;
+      }
+
+      DatasetService datasetService = bigQueryServices.getDatasetService(bigQueryOptions);
+      TableReference tableReference = BigQueryUtils.toTableReference(configuration.getTableSpec());
+
+      try {
+        Table table = datasetService.getTable(tableReference);
+        if (table == null) {
+          throw new NullPointerException();
+        }
+
+        TableSchema tableSchema = table.getSchema();
+        if (tableSchema == null) {
+          throw new NullPointerException();
+        }
+
+        Schema destinationSchema = BigQueryUtils.fromTableSchema(tableSchema);
+        if (destinationSchema == null) {
+          throw new NullPointerException();
+        }
+
+        validateMatching(sourceSchema, destinationSchema);
+
+      } catch (NullPointerException | InterruptedException | IOException e) {
+        throw new InvalidConfigurationException(
+            String.format(
+                "could not validate input for create disposition: %s and table: %s, error: %s",
+                configuration.getCreateDisposition(),
+                configuration.getTableSpec(),
+                e.getMessage()));
+      }
+    }
+
+    void validateMatching(Schema sourceSchema, Schema destinationSchema) {
+      if (!sourceSchema.equals(destinationSchema)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "source and destination schema mismatch for table: %s",
+                configuration.getTableSpec()));
+      }
+    }
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProviderTest.java
new file mode 100644
index 00000000000..5bca0cb2ad3
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProviderTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadProvider.PCollectionRowTupleTransform;
+import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
+import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
+import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Identifier;
+import org.apache.beam.sdk.transforms.display.DisplayData.Item;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test for {@link BigQuerySchemaTransformReadProvider}. */
+@RunWith(JUnit4.class)
+public class BigQuerySchemaTransformReadProviderTest {
+  private static final String PROJECT = "fakeproject";
+  private static final String DATASET = "fakedataset";
+  private static final String TABLE_ID = "faketable";
+
+  private static final String QUERY = "select * from `fakeproject.fakedataset.faketable`";
+  private static final String LOCATION = "kingdom-of-figaro";
+
+  private static final TableReference TABLE_REFERENCE =
+      new TableReference().setProjectId(PROJECT).setDatasetId(DATASET).setTableId(TABLE_ID);
+
+  private static final String TABLE_SPEC = BigQueryHelpers.toTableSpec(TABLE_REFERENCE);
+
+  private static final Schema SCHEMA =
+      Schema.of(Field.of("name", FieldType.STRING), Field.of("number", FieldType.INT64));
+
+  private static final List<TableRow> RECORDS =
+      Arrays.asList(
+          new TableRow().set("name", "a").set("number", 1L),
+          new TableRow().set("name", "b").set("number", 2L),
+          new TableRow().set("name", "c").set("number", 3L));
+
+  private static final List<Row> ROWS =
+      Arrays.asList(
+          Row.withSchema(SCHEMA).withFieldValue("name", "a").withFieldValue("number", 1L).build(),
+          Row.withSchema(SCHEMA).withFieldValue("name", "b").withFieldValue("number", 2L).build(),
+          Row.withSchema(SCHEMA).withFieldValue("name", "c").withFieldValue("number", 3L).build());
+
+  private static final TableSchema TABLE_SCHEMA = BigQueryUtils.toTableSchema(SCHEMA);
+  private static final BigQueryOptions OPTIONS =
+      TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+  private final FakeDatasetService fakeDatasetService = new FakeDatasetService();
+  private final FakeJobService fakeJobService = new FakeJobService();
+  private final Table fakeTable = new Table();
+  private final TemporaryFolder temporaryFolder = new TemporaryFolder();
+  private final FakeBigQueryServices fakeBigQueryServices =
+      new FakeBigQueryServices()
+          .withJobService(fakeJobService)
+          .withDatasetService(fakeDatasetService);
+
+  @Before
+  public void setUp() throws IOException, InterruptedException, ExecutionException {
+    FakeDatasetService.setUp();
+    FakeJobService.setUp();
+    BigQueryIO.clearStaticCaches();
+    fakeTable.setSchema(TABLE_SCHEMA);
+    fakeTable.setTableReference(TABLE_REFERENCE);
+    fakeDatasetService.createDataset(PROJECT, DATASET, LOCATION, "", null);
+    fakeDatasetService.createTable(fakeTable);
+    fakeDatasetService.insertAll(fakeTable.getTableReference(), RECORDS, null);
+    temporaryFolder.create();
+    OPTIONS.setProject(PROJECT);
+    OPTIONS.setTempLocation(temporaryFolder.getRoot().getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() {
+    temporaryFolder.delete();
+  }
+
+  @Rule
+  public transient TestPipeline p =
+      TestPipeline.fromOptions(OPTIONS).enableAbandonedNodeEnforcement(false);
+
+  @Test
+  public void testQuery() {
+    // Previous attempts using FakeBigQueryServices with a Read configuration using a query failed.
+    // For now, we test using DisplayData and the toTypedRead method.
+    List<Pair<BigQuerySchemaTransformReadConfiguration.Builder, TypedRead<TableRow>>> cases =
+        Arrays.asList(
+            Pair.of(
+                BigQuerySchemaTransformReadConfiguration.builder().setQuery(QUERY),
+                BigQueryIO.readTableRowsWithSchema().fromQuery(QUERY)),
+            Pair.of(
+                BigQuerySchemaTransformReadConfiguration.builder()
+                    .setQuery(QUERY)
+                    .setQueryLocation(LOCATION),
+                BigQueryIO.readTableRowsWithSchema().fromQuery(QUERY).withQueryLocation(LOCATION)),
+            Pair.of(
+                BigQuerySchemaTransformReadConfiguration.builder()
+                    .setQuery(QUERY)
+                    .setUseStandardSql(true),
+                BigQueryIO.readTableRowsWithSchema().fromQuery(QUERY).usingStandardSql()),
+            Pair.of(
+                BigQuerySchemaTransformReadConfiguration.builder()
+                    .setQuery(QUERY)
+                    .setUseStandardSql(false),
+                BigQueryIO.readTableRowsWithSchema().fromQuery(QUERY)));
+
+    for (Pair<BigQuerySchemaTransformReadConfiguration.Builder, TypedRead<TableRow>> caze : cases) {
+      Map<Identifier, Item> want = DisplayData.from(caze.getRight()).asMap();
+      SchemaTransformProvider provider = new BigQuerySchemaTransformReadProvider();
+      BigQuerySchemaTransformReadConfiguration configuration = caze.getLeft().build();
+      Row configurationRow = configuration.toBeamRow();
+      SchemaTransform schemaTransform = provider.from(configurationRow);
+      PCollectionRowTupleTransform pCollectionRowTupleTransform =
+          (PCollectionRowTupleTransform) schemaTransform.buildTransform();
+      Map<Identifier, Item> got =
+          DisplayData.from(pCollectionRowTupleTransform.toTypedRead()).asMap();
+      assertEquals(want, got);
+    }
+  }
+
+  @Test
+  public void testExtract() {
+    SchemaTransformProvider provider = new BigQuerySchemaTransformReadProvider();
+    BigQuerySchemaTransformReadConfiguration configuration =
+        BigQuerySchemaTransformReadConfiguration.builder().setTableSpec(TABLE_SPEC).build();
+    Row configurationRow = configuration.toBeamRow();
+    SchemaTransform schemaTransform = provider.from(configurationRow);
+    PCollectionRowTupleTransform pCollectionRowTupleTransform =
+        (PCollectionRowTupleTransform) schemaTransform.buildTransform();
+
+    pCollectionRowTupleTransform.setTestBigQueryServices(fakeBigQueryServices);
+    PCollectionRowTuple input = PCollectionRowTuple.empty(p);
+    String tag = provider.outputCollectionNames().get(0);
+    PCollectionRowTuple output = input.apply(pCollectionRowTupleTransform);
+    assertTrue(output.has(tag));
+    PCollection<Row> got = output.get(tag);
+    PAssert.that(got).containsInAnyOrder(ROWS);
+
+    p.run();
+  }
+
+  @Test
+  public void testInvalidConfiguration() {
+    SchemaTransformProvider provider = new BigQuerySchemaTransformReadProvider();
+    for (Pair<
+            BigQuerySchemaTransformReadConfiguration.Builder,
+            ? extends Class<? extends RuntimeException>>
+        caze :
+            Arrays.asList(
+                Pair.of(
+                    BigQuerySchemaTransformReadConfiguration.builder(),
+                    IllegalArgumentException.class),
+                Pair.of(
+                    BigQuerySchemaTransformReadConfiguration.builder()
+                        .setQuery(QUERY)
+                        .setTableSpec(TABLE_SPEC),
+                    IllegalStateException.class),
+                Pair.of(
+                    BigQuerySchemaTransformReadConfiguration.builder().setQueryLocation(LOCATION),
+                    IllegalArgumentException.class),
+                Pair.of(
+                    BigQuerySchemaTransformReadConfiguration.builder().setUseStandardSql(true),
+                    IllegalArgumentException.class))) {
+      Row configurationRow = caze.getLeft().build().toBeamRow();
+      SchemaTransform schemaTransform = provider.from(configurationRow);
+      PCollectionRowTupleTransform pCollectionRowTupleTransform =
+          (PCollectionRowTupleTransform) schemaTransform.buildTransform();
+      pCollectionRowTupleTransform.setTestBigQueryServices(fakeBigQueryServices);
+      PCollectionRowTuple empty = PCollectionRowTuple.empty(p);
+      assertThrows(caze.getRight(), () -> empty.apply(pCollectionRowTupleTransform));
+    }
+  }
+
+  @Test
+  public void testInvalidInput() {
+    SchemaTransformProvider provider = new BigQuerySchemaTransformReadProvider();
+    BigQuerySchemaTransformReadConfiguration configuration =
+        BigQuerySchemaTransformReadConfiguration.builder().setTableSpec(TABLE_SPEC).build();
+    Row configurationRow = configuration.toBeamRow();
+    SchemaTransform schemaTransform = provider.from(configurationRow);
+    PCollectionRowTupleTransform pCollectionRowTupleTransform =
+        (PCollectionRowTupleTransform) schemaTransform.buildTransform();
+
+    pCollectionRowTupleTransform.setTestBigQueryServices(fakeBigQueryServices);
+    PCollectionRowTuple input = PCollectionRowTuple.of("badinput", p.apply(Create.of(ROWS)));
+    assertThrows(IllegalArgumentException.class, () -> input.apply(pCollectionRowTupleTransform));
+  }
+
+  private void assertEquals(Map<Identifier, Item> want, Map<Identifier, Item> got) {
+    Set<Identifier> keys = new HashSet<>();
+    keys.addAll(want.keySet());
+    keys.addAll(got.keySet());
+    for (Identifier key : keys) {
+      Item wantItem = null;
+      Item gotItem = null;
+      if (want.containsKey(key)) {
+        wantItem = want.get(key);
+      }
+      if (got.containsKey(key)) {
+        gotItem = got.get(key);
+      }
+      Assert.assertEquals(wantItem, gotItem);
+    }
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProviderTest.java
new file mode 100644
index 00000000000..b752b182ebd
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProviderTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformWriteProvider.INPUT_TAG;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformWriteProvider.PCollectionRowTupleTransform;
+import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
+import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
+import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Identifier;
+import org.apache.beam.sdk.transforms.display.DisplayData.Item;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/** Test for {@link BigQuerySchemaTransformWriteProvider}. */
+public class BigQuerySchemaTransformWriteProviderTest {
+
+  private static final String PROJECT = "fakeproject";
+  private static final String DATASET = "fakedataset";
+  private static final String TABLE_ID = "faketable";
+
+  private static final TableReference TABLE_REFERENCE =
+      new TableReference().setProjectId(PROJECT).setDatasetId(DATASET).setTableId(TABLE_ID);
+
+  private static final Schema SCHEMA =
+      Schema.of(Field.of("name", FieldType.STRING), Field.of("number", FieldType.INT64));
+
+  private static final TableSchema TABLE_SCHEMA = BigQueryUtils.toTableSchema(SCHEMA);
+
+  private static final List<Row> ROWS =
+      Arrays.asList(
+          Row.withSchema(SCHEMA).withFieldValue("name", "a").withFieldValue("number", 1L).build(),
+          Row.withSchema(SCHEMA).withFieldValue("name", "b").withFieldValue("number", 2L).build(),
+          Row.withSchema(SCHEMA).withFieldValue("name", "c").withFieldValue("number", 3L).build());
+
+  private static final BigQueryOptions OPTIONS =
+      TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+  private final FakeDatasetService fakeDatasetService = new FakeDatasetService();
+  private final FakeJobService fakeJobService = new FakeJobService();
+  private final TemporaryFolder temporaryFolder = new TemporaryFolder();
+  private final FakeBigQueryServices fakeBigQueryServices =
+      new FakeBigQueryServices()
+          .withJobService(fakeJobService)
+          .withDatasetService(fakeDatasetService);
+
+  @Before
+  public void setUp() throws IOException, InterruptedException {
+    FakeDatasetService.setUp();
+    fakeDatasetService.createDataset(PROJECT, DATASET, "", "", null);
+    temporaryFolder.create();
+    OPTIONS.setProject(PROJECT);
+    OPTIONS.setTempLocation(temporaryFolder.getRoot().getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() {
+    temporaryFolder.delete();
+  }
+
+  @Rule public transient TestPipeline p = TestPipeline.fromOptions(OPTIONS);
+
+  @Test
+  public void testLoad() throws IOException, InterruptedException {
+    SchemaTransformProvider provider = new BigQuerySchemaTransformWriteProvider();
+    BigQuerySchemaTransformWriteConfiguration configuration =
+        BigQuerySchemaTransformWriteConfiguration.builder()
+            .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE))
+            .setWriteDisposition(WriteDisposition.WRITE_TRUNCATE.name())
+            .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name())
+            .build();
+    Row configurationRow = configuration.toBeamRow();
+    SchemaTransform schemaTransform = provider.from(configurationRow);
+    PCollectionRowTupleTransform pCollectionRowTupleTransform =
+        (PCollectionRowTupleTransform) schemaTransform.buildTransform();
+    pCollectionRowTupleTransform.setTestBigQueryServices(fakeBigQueryServices);
+    String tag = provider.inputCollectionNames().get(0);
+    PCollectionRowTuple input =
+        PCollectionRowTuple.of(tag, p.apply(Create.of(ROWS).withRowSchema(SCHEMA)));
+    input.apply(pCollectionRowTupleTransform);
+
+    p.run();
+
+    assertNotNull(fakeDatasetService.getTable(TABLE_REFERENCE));
+    assertEquals(ROWS.size(), fakeDatasetService.getAllRows(PROJECT, DATASET, TABLE_ID).size());
+  }
+
+  @Test
+  public void testValidatePipelineOptions() {
+    List<Pair<BigQuerySchemaTransformWriteConfiguration.Builder, Class<? extends Exception>>>
+        cases =
+            Arrays.asList(
+                Pair.of(
+                    BigQuerySchemaTransformWriteConfiguration.builder()
+                        .setTableSpec("project.doesnot.exist")
+                        .setCreateDisposition(CreateDisposition.CREATE_NEVER.name())
+                        .setWriteDisposition(WriteDisposition.WRITE_APPEND.name()),
+                    InvalidConfigurationException.class),
+                Pair.of(
+                    BigQuerySchemaTransformWriteConfiguration.builder()
+                        .setTableSpec(String.format("%s.%s.%s", PROJECT, DATASET, "doesnotexist"))
+                        .setCreateDisposition(CreateDisposition.CREATE_NEVER.name())
+                        .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name()),
+                    InvalidConfigurationException.class),
+                Pair.of(
+                    BigQuerySchemaTransformWriteConfiguration.builder()
+                        .setTableSpec("project.doesnot.exist")
+                        .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name())
+                        .setWriteDisposition(WriteDisposition.WRITE_APPEND.name()),
+                    null));
+    for (Pair<BigQuerySchemaTransformWriteConfiguration.Builder, Class<? extends Exception>> caze :
+        cases) {
+      PCollectionRowTupleTransform transform = transformFrom(caze.getLeft().build());
+      if (caze.getRight() != null) {
+        assertThrows(caze.getRight(), () -> transform.validate(p.getOptions()));
+      } else {
+        transform.validate(p.getOptions());
+      }
+    }
+  }
+
+  @Test
+  public void testToWrite() {
+    List<Pair<BigQuerySchemaTransformWriteConfiguration.Builder, BigQueryIO.Write<TableRow>>>
+        cases =
+            Arrays.asList(
+                Pair.of(
+                    BigQuerySchemaTransformWriteConfiguration.builder()
+                        .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE))
+                        .setCreateDisposition(CreateDisposition.CREATE_NEVER.name())
+                        .setWriteDisposition(WriteDisposition.WRITE_EMPTY.name()),
+                    BigQueryIO.writeTableRows()
+                        .to(TABLE_REFERENCE)
+                        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
+                        .withWriteDisposition(WriteDisposition.WRITE_EMPTY)
+                        .withSchema(TABLE_SCHEMA)),
+                Pair.of(
+                    BigQuerySchemaTransformWriteConfiguration.builder()
+                        .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE))
+                        .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name())
+                        .setWriteDisposition(WriteDisposition.WRITE_TRUNCATE.name()),
+                    BigQueryIO.writeTableRows()
+                        .to(TABLE_REFERENCE)
+                        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+                        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
+                        .withSchema(TABLE_SCHEMA)));
+    for (Pair<BigQuerySchemaTransformWriteConfiguration.Builder, BigQueryIO.Write<TableRow>> caze :
+        cases) {
+      PCollectionRowTupleTransform transform = transformFrom(caze.getLeft().build());
+      Map<Identifier, Item> gotDisplayData = DisplayData.from(transform.toWrite(SCHEMA)).asMap();
+      Map<Identifier, Item> wantDisplayData = DisplayData.from(caze.getRight()).asMap();
+      Set<Identifier> keys = new HashSet<>();
+      keys.addAll(gotDisplayData.keySet());
+      keys.addAll(wantDisplayData.keySet());
+      for (Identifier key : keys) {
+        Item got = null;
+        Item want = null;
+        if (gotDisplayData.containsKey(key)) {
+          got = gotDisplayData.get(key);
+        }
+        if (wantDisplayData.containsKey(key)) {
+          want = wantDisplayData.get(key);
+        }
+        assertEquals(want, got);
+      }
+    }
+  }
+
+  @Test
+  public void validatePCollectionRowTupleInput() {
+    PCollectionRowTuple empty = PCollectionRowTuple.empty(p);
+    PCollectionRowTuple valid =
+        PCollectionRowTuple.of(
+            INPUT_TAG, p.apply("CreateRowsWithValidSchema", Create.of(ROWS)).setRowSchema(SCHEMA));
+
+    PCollectionRowTuple invalid =
+        PCollectionRowTuple.of(
+            INPUT_TAG,
+            p.apply(
+                "CreateRowsWithInvalidSchema",
+                Create.of(
+                    Row.nullRow(
+                        Schema.builder().addNullableField("name", FieldType.STRING).build()))));
+
+    PCollectionRowTupleTransform transform =
+        transformFrom(
+            BigQuerySchemaTransformWriteConfiguration.builder()
+                .setTableSpec(BigQueryHelpers.toTableSpec(TABLE_REFERENCE))
+                .setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED.name())
+                .setWriteDisposition(WriteDisposition.WRITE_APPEND.name())
+                .build());
+
+    assertThrows(IllegalArgumentException.class, () -> transform.validate(empty));
+
+    assertThrows(IllegalStateException.class, () -> transform.validate(invalid));
+
+    transform.validate(valid);
+
+    p.run();
+  }
+
+  private PCollectionRowTupleTransform transformFrom(
+      BigQuerySchemaTransformWriteConfiguration configuration) {
+    SchemaTransformProvider provider = new BigQuerySchemaTransformWriteProvider();
+    PCollectionRowTupleTransform transform =
+        (PCollectionRowTupleTransform) provider.from(configuration.toBeamRow()).buildTransform();
+
+    transform.setTestBigQueryServices(fakeBigQueryServices);
+
+    return transform;
+  }
+}