You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/24 23:31:51 UTC

[GitHub] [beam] damondouglas opened a new pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

damondouglas opened a new pull request #17181:
URL: https://github.com/apache/beam/pull/17181


   This PR, currently work-in-progress, closes [BEAM-14035](https://issues.apache.org/jira/browse/BEAM-14035) by creating:
   
   - [x] `BigQuerySchemaTransformReadConfiguration` - Configuration for reading from BigQuery.
   - [x] `BigQuerySchemaTransformReadProvider` - an implementation of TypedSchemaTransformProvider
   - [ ] `BigQuerySchemaTransformWriteConfiguration` - Configuration for writing to BigQuery.
   - [ ] `BigQuerySchemaTransformWriteProvider` - an implementation of TypedSchemaTransformProvider
   
   I would like to request to following to review this PR:
   @laraschmidt 
   @angoenka 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - ~Update `CHANGES.md` with noteworthy changes.~
    - ~If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).~
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] damondouglas commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
damondouglas commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r835592220



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+
+/**
+ * Configuration for reading from BigQuery.
+ *
+ * <p>This class is meant to be used with {@link BigQuerySchemaTransformReadProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaTransformReadConfiguration {
+
+  private static final boolean DEFAULT_USE_STANDARD_SQL = true;
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} from the SQL query.
+   *
+   * <p>The configuration defaults to useStandardSql=true.
+   */
+  public static Builder createQueryBuilder(String query) {
+    return defaultBuilder().setQuery(query).setJobType(JobType.QUERY);
+  }
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} to support BigQuery
+   * extract jobs. See the getTableSpec() getter for details.
+   */
+  public static Builder createExtractBuilder(String tableSpec) {
+    return defaultBuilder().setTableSpec(tableSpec).setJobType(JobType.EXTRACT);
+  }
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} to support BigQuery
+   * extract jobs.
+   */
+  public static Builder createExtractBuilder(TableReference tableSpec) {
+    if (tableSpec.getProjectId().isEmpty()) {
+      return createExtractBuilder(
+          String.format("%s.%s", tableSpec.getDatasetId(), tableSpec.getTableId()));
+    }
+    return createExtractBuilder(
+        String.format(
+            "%s:%s.%s",
+            tableSpec.getProjectId(), tableSpec.getDatasetId(), tableSpec.getTableId()));
+  }
+
+  private static Builder defaultBuilder() {
+    return new AutoValue_BigQuerySchemaTransformReadConfiguration.Builder()
+        .setJobType(JobType.UNSPECIFIED)
+        .setQuery("")

Review comment:
       @laraschmidt May we consider reverting back to defaulting to empty Strings?
   
   Changing the parameters to Nullable leads to this error in the Jenkins output.  When I used non-nullable and default to empty string, I didn't get this error:
   ```
   /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_PVR_Flink_Docker_Commit/src/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:155: error: [argument.type.incompatible] incompatible argument for parameter tableSpec of from.
   13:26:39       return BigQueryIO.readTableRowsWithSchema().from(configuration.getTableSpec());
   13:26:39                                                                                  ^
   13:26:39   found   : @Initialized @Nullable String
   13:26:39   required: @Initialized @NonNull String
   13:26:39 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_PVR_Flink_Docker_Commit/src/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:159: error: [argument.type.incompatible] incompatible argument for parameter arg0 of requireNonNull.
   13:26:39       String query = Objects.requireNonNull(configuration.getQuery());
   13:26:39                                                                   ^
   13:26:39   found   : @Initialized @Nullable String
   13:26:39   required: @Initialized @NonNull String
   13:26:39 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_PVR_Flink_Docker_Commit/src/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:164: error: [argument.type.incompatible] incompatible argument for parameter location of withQueryLocation.
   13:26:39         read = read.withQueryLocation(configuration.getQueryLocation());
   13:26:39                                                                     ^
   13:26:39   found   : @Initialized @Nullable String
   13:26:39   required: @Initialized @NonNull String
   13:26:39 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_PVR_Flink_Docker_Commit/src/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:168: error: [condition.nullable] condition on a possibly-null value (configuration.getUseStandardSql())
   13:26:39         if (configuration.getUseStandardSql()) {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r835666539



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration configuration) {
+    return new BigQuerySchemaTransformRead(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQuerySchemaTransformRead implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQuerySchemaTransformRead(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new BigQuerySchemaTransformReadTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQuerySchemaTransformReadTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQuerySchemaTransformReadTransform(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+      PCollection<TableRow> tableRowPCollection = input.getPipeline().apply(toTypedRead());
+      Schema schema = tableRowPCollection.getSchema();
+      PCollection<Row> rowPCollection =
+          tableRowPCollection.apply(
+              MapElements.into(TypeDescriptor.of(Row.class))
+                  .via((tableRow) -> BigQueryUtils.toBeamRow(schema, tableRow)));
+      return PCollectionRowTuple.of(OUTPUT_TAG, rowPCollection);
+    }
+
+    BigQueryIO.TypedRead<TableRow> toTypedRead() {
+      JobType jobType = configuration.getJobType();
+      switch (jobType) {
+        case QUERY:
+          return toQueryTypedRead();
+
+        case EXTRACT:
+          return toExtractTypedRead();
+
+        default:
+          throw new InvalidConfigurationException(
+              String.format("invalid job type for BigQueryIO read, got: %s", jobType));
+      }
+    }
+
+    private BigQueryIO.TypedRead<TableRow> toExtractTypedRead() {
+      return BigQueryIO.readTableRowsWithSchema().from(configuration.getTableSpec());
+    }
+
+    private BigQueryIO.TypedRead<TableRow> toQueryTypedRead() {

Review comment:
       > Should I add the following annotation?
   > 
   > ```
   > @SuppressWarnings({
   >   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
   > })
   > ```
   
   Well it's calling out a real problem right? The function it's calling doesn't handle null but you might have a null. You should probably use Preconditions.check and fail if it's null and shouldn't be. I would think that might help the check but if not then we can add that suppress too. But I would expect there's a way to indicate to the checker that we have checked for null?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] damondouglas commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
damondouglas commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r835690796



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProviderTest.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformWriteProvider.BigQuerySchemaTransformWriteTransform;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Identifier;
+import org.apache.beam.sdk.transforms.display.DisplayData.Item;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test for {@link BigQuerySchemaTransformWriteProvider}. */
+@RunWith(JUnit4.class)
+public class BigQuerySchemaTransformWriteProviderTest {

Review comment:
       I will make another attempt at FakeBigQueryServices.  I like the idea of having a full end to end test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] damondouglas commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
damondouglas commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r835688230



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProviderTest.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformWriteProvider.BigQuerySchemaTransformWriteTransform;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Identifier;
+import org.apache.beam.sdk.transforms.display.DisplayData.Item;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test for {@link BigQuerySchemaTransformWriteProvider}. */
+@RunWith(JUnit4.class)
+public class BigQuerySchemaTransformWriteProviderTest {

Review comment:
       I wasn't able to get the FakeBigQueryServices working.  This is why I relied on the DisplayData in the test.  Do you know of anyone who could help with the FakeBigQueryServices if this is important?  Or is relying on the fact that the right BigQuery.Write (or Read in the read test) is sufficient measured through DisplayData?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] damondouglas commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
damondouglas commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r835690873



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteConfiguration.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for writing to BigQuery.
+ *
+ * <p>This class is meant to be used with {@link BigQuerySchemaTransformWriteProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaTransformWriteConfiguration {
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformWriteConfiguration} to support BigQuery load jobs.
+   * See {@link BigQueryIO.Write#to(String)}} for toTableSpec expected format.
+   */
+  public static BigQuerySchemaTransformWriteConfiguration createLoad(
+      String toTableSpec, CreateDisposition createDisposition, WriteDisposition writeDisposition) {
+    return new AutoValue_BigQuerySchemaTransformWriteConfiguration.Builder()
+        .setTableSpec(toTableSpec)
+        .setCreateDisposition(createDisposition.name())
+        .setWriteDisposition(writeDisposition.name())
+        .build();
+  }
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformWriteConfiguration} to support BigQuery load jobs.
+   */
+  public static BigQuerySchemaTransformWriteConfiguration createLoad(

Review comment:
       I agree and will make this change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] damondouglas commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
damondouglas commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r835594560



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration configuration) {
+    return new BigQuerySchemaTransformRead(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQuerySchemaTransformRead implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQuerySchemaTransformRead(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new BigQuerySchemaTransformReadTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQuerySchemaTransformReadTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQuerySchemaTransformReadTransform(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+      PCollection<TableRow> tableRowPCollection = input.getPipeline().apply(toTypedRead());
+      Schema schema = tableRowPCollection.getSchema();
+      PCollection<Row> rowPCollection =
+          tableRowPCollection.apply(
+              MapElements.into(TypeDescriptor.of(Row.class))
+                  .via((tableRow) -> BigQueryUtils.toBeamRow(schema, tableRow)));
+      return PCollectionRowTuple.of(OUTPUT_TAG, rowPCollection);
+    }
+
+    BigQueryIO.TypedRead<TableRow> toTypedRead() {
+      JobType jobType = configuration.getJobType();
+      switch (jobType) {
+        case QUERY:
+          return toQueryTypedRead();
+
+        case EXTRACT:
+          return toExtractTypedRead();
+
+        default:
+          throw new InvalidConfigurationException(
+              String.format("invalid job type for BigQueryIO read, got: %s", jobType));
+      }
+    }
+
+    private BigQueryIO.TypedRead<TableRow> toExtractTypedRead() {
+      return BigQueryIO.readTableRowsWithSchema().from(configuration.getTableSpec());
+    }
+
+    private BigQueryIO.TypedRead<TableRow> toQueryTypedRead() {

Review comment:
       This code throws the following error when implementing based on [this comment](https://github.com/apache/beam/pull/17181#discussion_r835454867)
   
   ```
   /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_PVR_Flink_Docker_Commit/src/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:155: error: [argument.type.incompatible] incompatible argument for parameter tableSpec of from.
   13:26:39       return BigQueryIO.readTableRowsWithSchema().from(configuration.getTableSpec());
   13:26:39                                                                                  ^
   13:26:39   found   : @Initialized @Nullable String
   13:26:39   required: @Initialized @NonNull String
   13:26:39 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_PVR_Flink_Docker_Commit/src/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:159: error: [argument.type.incompatible] incompatible argument for parameter arg0 of requireNonNull.
   13:26:39       String query = Objects.requireNonNull(configuration.getQuery());
   13:26:39                                                                   ^
   13:26:39   found   : @Initialized @Nullable String
   13:26:39   required: @Initialized @NonNull String
   13:26:39 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_PVR_Flink_Docker_Commit/src/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:164: error: [argument.type.incompatible] incompatible argument for parameter location of withQueryLocation.
   13:26:39         read = read.withQueryLocation(configuration.getQueryLocation());
   13:26:39                                                                     ^
   13:26:39   found   : @Initialized @Nullable String
   13:26:39   required: @Initialized @NonNull String
   13:26:39 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_PVR_Flink_Docker_Commit/src/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:168: error: [condition.nullable] condition on a possibly-null value (configuration.getUseStandardSql())
   13:26:39         if (configuration.getUseStandardSql()) {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] damondouglas commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
damondouglas commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r841097703



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery write jobs configured
+ * using {@link BigQuerySchemaTransformWriteConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformWriteConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String INPUT_TAG = "INPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformWriteConfiguration> configurationClass() {
+    return BigQuerySchemaTransformWriteConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformWriteConfiguration configuration) {
+    return new BigQueryWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since a
+   * single is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * no output is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery write jobs configured using {@link
+   * BigQuerySchemaTransformWriteConfiguration}.
+   */
+  static class BigQueryWriteSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformWriteConfiguration configuration;
+
+    BigQueryWriteSchemaTransform(BigQuerySchemaTransformWriteConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new BigQuerySchemaTransformWriteTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery write jobs configured using {@link
+   * BigQuerySchemaTransformWriteConfiguration}.
+   */
+  static class BigQuerySchemaTransformWriteTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+    private final BigQuerySchemaTransformWriteConfiguration configuration;
+
+    BigQuerySchemaTransformWriteTransform(BigQuerySchemaTransformWriteConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.has(INPUT_TAG)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s is missing expected tag: %s",
+                getClass().getSimpleName(), input.getClass().getSimpleName(), INPUT_TAG));
+      }
+      PCollection<Row> rowPCollection = input.get(INPUT_TAG);
+      Schema schema = rowPCollection.getSchema();
+      PCollection<TableRow> tableRowPCollection =
+          rowPCollection.apply(
+              MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow));
+      tableRowPCollection.apply(toWrite(schema));

Review comment:
       I will actually address checking the alignment of the Schema with the table, if it exists, in this PR.  I agree.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] damondouglas commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
damondouglas commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r841097800



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProviderTest.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadProvider.PCollectionRowTupleTransform;
+import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
+import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
+import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test for {@link BigQuerySchemaTransformReadProvider}. */
+@RunWith(JUnit4.class)
+public class BigQuerySchemaTransformReadProviderTest {
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
+  private static final TypeDescriptor<BigQuerySchemaTransformReadConfiguration> TYPE_DESCRIPTOR =
+      TypeDescriptor.of(BigQuerySchemaTransformReadConfiguration.class);
+  private static final SerializableFunction<BigQuerySchemaTransformReadConfiguration, Row>
+      ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+  private static final String FAKE_PROJECT = "fakeproject";
+  private static final String FAKE_DATASET = "fakedataset";
+  private static final String FAKE_TABLE_ID = "faketable";
+  // private static final String FAKE_QUERY = String.format("select * from `%s.%s.%s`", FAKE_PROJECT,
+  //     FAKE_DATASET, FAKE_TABLE_ID);
+
+  private static final TableReference FAKE_TABLE_REFERENCE = new TableReference()
+      .setProjectId(FAKE_PROJECT)
+      .setDatasetId(FAKE_DATASET)
+      .setTableId(FAKE_TABLE_ID);
+
+  private static final String FAKE_TABLE_SPEC = BigQueryHelpers.toTableSpec(FAKE_TABLE_REFERENCE);
+
+  private static final Schema FAKE_SCHEMA = Schema.of(
+      Field.of("name", FieldType.STRING),
+      Field.of("number", FieldType.INT64)
+  );
+
+  private static final List<TableRow> FAKE_RECORDS = Arrays.asList(
+      new TableRow().set("name", "a").set("number", 1L),
+      new TableRow().set("name", "b").set("number", 2L),
+      new TableRow().set("name", "c").set("number", 3L)
+  );
+
+  private static final List<Row> FAKE_ROWS = Arrays.asList(
+      Row.withSchema(FAKE_SCHEMA)
+          .withFieldValue("name", "a")
+          .withFieldValue("number", 1L)
+          .build(),
+      Row.withSchema(FAKE_SCHEMA)
+          .withFieldValue("name", "b")
+          .withFieldValue("number", 2L)
+          .build(),
+      Row.withSchema(FAKE_SCHEMA)
+          .withFieldValue("name", "c")
+          .withFieldValue("number", 3L)
+          .build()
+  );
+
+  private static final TableSchema FAKE_TABLE_SCHEMA = BigQueryUtils.toTableSchema(FAKE_SCHEMA);
+  private static final BigQueryOptions OPTIONS =
+      TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+  private final FakeDatasetService fakeDatasetService = new FakeDatasetService();
+  private final FakeJobService fakeJobService = new FakeJobService();
+  private final Table fakeTable = new Table();
+  private final TemporaryFolder temporaryFolder = new TemporaryFolder();
+  private final FakeBigQueryServices fakeBigQueryServices =
+      new FakeBigQueryServices()
+          .withJobService(fakeJobService)
+          .withDatasetService(fakeDatasetService);
+
+  @Before
+  public void setUp() throws IOException, InterruptedException {
+    FakeDatasetService.setUp();
+    FakeJobService.setUp();
+    BigQueryIO.clearCreatedTables();
+    fakeTable.setSchema(FAKE_TABLE_SCHEMA);
+    fakeTable.setTableReference(FAKE_TABLE_REFERENCE);
+    fakeTable.setNumBytes(1024L * 1024L);
+    fakeDatasetService.createDataset(FAKE_PROJECT, FAKE_DATASET, "", "", null);
+    fakeDatasetService.createTable(fakeTable);
+    fakeDatasetService.insertAll(fakeTable.getTableReference(), FAKE_RECORDS, null);
+    temporaryFolder.create();
+    OPTIONS.setProject(FAKE_PROJECT);
+    OPTIONS.setTempLocation(temporaryFolder.getRoot().getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() {
+    temporaryFolder.delete();
+  }
+
+  @Rule
+  public transient TestPipeline p = TestPipeline.fromOptions(OPTIONS);
+
+  @Test
+  public void testFromExtractConfiguration() {

Review comment:
       I agree.  I will make this change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r835667278



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteConfiguration.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for writing to BigQuery.
+ *
+ * <p>This class is meant to be used with {@link BigQuerySchemaTransformWriteProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaTransformWriteConfiguration {
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformWriteConfiguration} to support BigQuery load jobs.
+   * See {@link BigQueryIO.Write#to(String)}} for toTableSpec expected format.
+   */
+  public static BigQuerySchemaTransformWriteConfiguration createLoad(
+      String toTableSpec, CreateDisposition createDisposition, WriteDisposition writeDisposition) {
+    return new AutoValue_BigQuerySchemaTransformWriteConfiguration.Builder()
+        .setTableSpec(toTableSpec)
+        .setCreateDisposition(createDisposition.name())
+        .setWriteDisposition(writeDisposition.name())
+        .build();
+  }
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformWriteConfiguration} to support BigQuery load jobs.
+   */
+  public static BigQuerySchemaTransformWriteConfiguration createLoad(

Review comment:
       These are all the fields of the builder. Maybe we just would not have them and let people use the builder?
   Also let's rename createLoad to write maybe? Unless load is exposed as terminology on BQ write but I didnt' thnk so?

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProviderTest.java
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformWriteProvider.BigQuerySchemaTransformWriteTransform;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Identifier;
+import org.apache.beam.sdk.transforms.display.DisplayData.Item;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test for {@link BigQuerySchemaTransformWriteProvider}. */
+@RunWith(JUnit4.class)
+public class BigQuerySchemaTransformWriteProviderTest {

Review comment:
       ditto to comments on the read test

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery write jobs configured
+ * using {@link BigQuerySchemaTransformWriteConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformWriteConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String INPUT_TAG = "INPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformWriteConfiguration> configurationClass() {
+    return BigQuerySchemaTransformWriteConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformWriteConfiguration configuration) {
+    return new BigQuerySchemaTransformWrite(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since a
+   * single is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * no output is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery write jobs configured using {@link
+   * BigQuerySchemaTransformWriteConfiguration}.
+   */
+  static class BigQuerySchemaTransformWrite implements SchemaTransform {

Review comment:
       BigQueryWriteSchemaTransform?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] damondouglas commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
damondouglas commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r835655245



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration configuration) {
+    return new BigQuerySchemaTransformRead(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQuerySchemaTransformRead implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQuerySchemaTransformRead(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new BigQuerySchemaTransformReadTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQuerySchemaTransformReadTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQuerySchemaTransformReadTransform(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+      PCollection<TableRow> tableRowPCollection = input.getPipeline().apply(toTypedRead());
+      Schema schema = tableRowPCollection.getSchema();
+      PCollection<Row> rowPCollection =
+          tableRowPCollection.apply(
+              MapElements.into(TypeDescriptor.of(Row.class))
+                  .via((tableRow) -> BigQueryUtils.toBeamRow(schema, tableRow)));
+      return PCollectionRowTuple.of(OUTPUT_TAG, rowPCollection);
+    }
+
+    BigQueryIO.TypedRead<TableRow> toTypedRead() {
+      JobType jobType = configuration.getJobType();
+      switch (jobType) {
+        case QUERY:
+          return toQueryTypedRead();
+
+        case EXTRACT:
+          return toExtractTypedRead();
+
+        default:
+          throw new InvalidConfigurationException(
+              String.format("invalid job type for BigQueryIO read, got: %s", jobType));
+      }
+    }
+
+    private BigQueryIO.TypedRead<TableRow> toExtractTypedRead() {
+      return BigQueryIO.readTableRowsWithSchema().from(configuration.getTableSpec());
+    }
+
+    private BigQueryIO.TypedRead<TableRow> toQueryTypedRead() {

Review comment:
       Should I add the following annotation?
   ```
   @SuppressWarnings({
     "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
   })
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] damondouglas commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
damondouglas commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r835592220



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+
+/**
+ * Configuration for reading from BigQuery.
+ *
+ * <p>This class is meant to be used with {@link BigQuerySchemaTransformReadProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaTransformReadConfiguration {
+
+  private static final boolean DEFAULT_USE_STANDARD_SQL = true;
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} from the SQL query.
+   *
+   * <p>The configuration defaults to useStandardSql=true.
+   */
+  public static Builder createQueryBuilder(String query) {
+    return defaultBuilder().setQuery(query).setJobType(JobType.QUERY);
+  }
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} to support BigQuery
+   * extract jobs. See the getTableSpec() getter for details.
+   */
+  public static Builder createExtractBuilder(String tableSpec) {
+    return defaultBuilder().setTableSpec(tableSpec).setJobType(JobType.EXTRACT);
+  }
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} to support BigQuery
+   * extract jobs.
+   */
+  public static Builder createExtractBuilder(TableReference tableSpec) {
+    if (tableSpec.getProjectId().isEmpty()) {
+      return createExtractBuilder(
+          String.format("%s.%s", tableSpec.getDatasetId(), tableSpec.getTableId()));
+    }
+    return createExtractBuilder(
+        String.format(
+            "%s:%s.%s",
+            tableSpec.getProjectId(), tableSpec.getDatasetId(), tableSpec.getTableId()));
+  }
+
+  private static Builder defaultBuilder() {
+    return new AutoValue_BigQuerySchemaTransformReadConfiguration.Builder()
+        .setJobType(JobType.UNSPECIFIED)
+        .setQuery("")

Review comment:
       @laraschmidt May we consider reverting back to defaulting to empty Strings?  Please see https://github.com/apache/beam/pull/17181/files#r835594560




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r835454475



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+
+/**
+ * Configuration for reading from BigQuery.
+ *
+ * <p>This class is meant to be used with {@link BigQuerySchemaTransformReadProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaTransformReadConfiguration {
+
+  private static final boolean DEFAULT_USE_STANDARD_SQL = true;
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} from the SQL query.
+   *
+   * <p>The configuration defaults to useStandardSql=true.
+   */
+  public static Builder createQueryBuilder(String query) {
+    return defaultBuilder().setQuery(query).setJobType(JobType.QUERY);
+  }
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} to support BigQuery
+   * extract jobs. See the getTableSpec() getter for details.
+   */
+  public static Builder createExtractBuilder(String tableSpec) {
+    return defaultBuilder().setTableSpec(tableSpec).setJobType(JobType.EXTRACT);
+  }
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} to support BigQuery
+   * extract jobs.
+   */
+  public static Builder createExtractBuilder(TableReference tableSpec) {
+    if (tableSpec.getProjectId().isEmpty()) {
+      return createExtractBuilder(
+          String.format("%s.%s", tableSpec.getDatasetId(), tableSpec.getTableId()));
+    }
+    return createExtractBuilder(
+        String.format(
+            "%s:%s.%s",

Review comment:
       This doesn't actually work if table has a ':' in it but maybe it's fine?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+
+/**
+ * Configuration for reading from BigQuery.
+ *
+ * <p>This class is meant to be used with {@link BigQuerySchemaTransformReadProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaTransformReadConfiguration {
+
+  private static final boolean DEFAULT_USE_STANDARD_SQL = true;
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} from the SQL query.
+   *
+   * <p>The configuration defaults to useStandardSql=true.
+   */
+  public static Builder createQueryBuilder(String query) {
+    return defaultBuilder().setQuery(query).setJobType(JobType.QUERY);
+  }
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} to support BigQuery
+   * extract jobs. See the getTableSpec() getter for details.
+   */
+  public static Builder createExtractBuilder(String tableSpec) {
+    return defaultBuilder().setTableSpec(tableSpec).setJobType(JobType.EXTRACT);
+  }
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} to support BigQuery
+   * extract jobs.
+   */
+  public static Builder createExtractBuilder(TableReference tableSpec) {
+    if (tableSpec.getProjectId().isEmpty()) {
+      return createExtractBuilder(
+          String.format("%s.%s", tableSpec.getDatasetId(), tableSpec.getTableId()));
+    }
+    return createExtractBuilder(
+        String.format(
+            "%s:%s.%s",
+            tableSpec.getProjectId(), tableSpec.getDatasetId(), tableSpec.getTableId()));
+  }
+
+  private static Builder defaultBuilder() {
+    return new AutoValue_BigQuerySchemaTransformReadConfiguration.Builder()
+        .setJobType(JobType.UNSPECIFIED)
+        .setQuery("")

Review comment:
       can we make these nullable if they should be null? The schema inference will handle that.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@Internal
+@Experimental(Kind.SCHEMAS)
+@AutoValue
+public class BigQuerySchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String TAG = "ToRows";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration configuration) {
+    return new BigQuerySchemaTransformRead(configuration);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQuerySchemaTransformRead implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQuerySchemaTransformRead(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    BigQuerySchemaTransformReadConfiguration getConfiguration() {
+      return configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new BigQuerySchemaTransformReadTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQuerySchemaTransformReadTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQuerySchemaTransformReadTransform(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException("PCollectionRowTuple input is expected to be empty");

Review comment:
       Mention bigquery here so it's more understandable aon it's own?

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProviderTest.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadProvider.BigQuerySchemaTransformRead;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test for {@link BigQuerySchemaTransformReadProvider}. */
+@RunWith(JUnit4.class)
+public class BigQuerySchemaTransformReadProviderTest {
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
+  private static final TypeDescriptor<BigQuerySchemaTransformReadConfiguration> TYPE_DESCRIPTOR =
+      TypeDescriptor.of(BigQuerySchemaTransformReadConfiguration.class);
+  private static final Schema SCHEMA = AUTO_VALUE_SCHEMA.schemaFor(TYPE_DESCRIPTOR);
+  private static final SerializableFunction<BigQuerySchemaTransformReadConfiguration, Row>
+      ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+  @Test
+  public void testFromExtractConfiguration() {
+    BigQuerySchemaTransformReadConfiguration configuration =
+        BigQuerySchemaTransformReadConfiguration.createExtractBuilder("dataset.table").build();
+    Row configurationRow = ROW_SERIALIZABLE_FUNCTION.apply(configuration);
+    SchemaTransformProvider provider = new BigQuerySchemaTransformReadProvider();
+    BigQuerySchemaTransformRead transform =
+        (BigQuerySchemaTransformRead) provider.from(configurationRow);
+    assertEquals(configuration.getTableSpec(), transform.getConfiguration().getTableSpec());
+  }
+
+  @Test
+  public void testFromQueryConfiguration() {
+    BigQuerySchemaTransformReadConfiguration want =
+        BigQuerySchemaTransformReadConfiguration.createQueryBuilder("select * from example")
+            .build();
+    Row configurationRow = ROW_SERIALIZABLE_FUNCTION.apply(want);
+    SchemaTransformProvider provider = new BigQuerySchemaTransformReadProvider();
+    BigQuerySchemaTransformRead transform =

Review comment:
       Is this testing your transform code at all though? The most complex part of the code is expand so it would be good to hav ea simple test. But if we can't get anything with bigquery working then that may not be an option. I know you said that it was hard to get the test system running. Is there no good examples of bigquery tests? I was thinking you were able to test the transform code without it, but if not it would be good to add something if possible.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@Internal
+@Experimental(Kind.SCHEMAS)
+@AutoValue
+public class BigQuerySchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String TAG = "ToRows";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration configuration) {
+    return new BigQuerySchemaTransformRead(configuration);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQuerySchemaTransformRead implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQuerySchemaTransformRead(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    BigQuerySchemaTransformReadConfiguration getConfiguration() {
+      return configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new BigQuerySchemaTransformReadTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQuerySchemaTransformReadTransform

Review comment:
       Can we put this sat the end of the class so it's not in the middle? It's kind of hard to follow in the middle.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@Internal
+@Experimental(Kind.SCHEMAS)
+@AutoValue
+public class BigQuerySchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String TAG = "ToRows";

Review comment:
       READ_TAG or OUTPUT_TAG?
   Also, ToRows doesn't really make sense as an identifier for an output I don't think. Would recommend a different name here.

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProviderTest.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadProvider.BigQuerySchemaTransformRead;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test for {@link BigQuerySchemaTransformReadProvider}. */
+@RunWith(JUnit4.class)
+public class BigQuerySchemaTransformReadProviderTest {
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
+  private static final TypeDescriptor<BigQuerySchemaTransformReadConfiguration> TYPE_DESCRIPTOR =
+      TypeDescriptor.of(BigQuerySchemaTransformReadConfiguration.class);
+  private static final Schema SCHEMA = AUTO_VALUE_SCHEMA.schemaFor(TYPE_DESCRIPTOR);
+  private static final SerializableFunction<BigQuerySchemaTransformReadConfiguration, Row>
+      ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+  @Test
+  public void testFromExtractConfiguration() {
+    BigQuerySchemaTransformReadConfiguration configuration =
+        BigQuerySchemaTransformReadConfiguration.createExtractBuilder("dataset.table").build();
+    Row configurationRow = ROW_SERIALIZABLE_FUNCTION.apply(configuration);
+    SchemaTransformProvider provider = new BigQuerySchemaTransformReadProvider();
+    BigQuerySchemaTransformRead transform =
+        (BigQuerySchemaTransformRead) provider.from(configurationRow);
+    assertEquals(configuration.getTableSpec(), transform.getConfiguration().getTableSpec());
+  }
+
+  @Test
+  public void testFromQueryConfiguration() {
+    BigQuerySchemaTransformReadConfiguration want =
+        BigQuerySchemaTransformReadConfiguration.createQueryBuilder("select * from example")
+            .build();
+    Row configurationRow = ROW_SERIALIZABLE_FUNCTION.apply(want);

Review comment:
       Just use your own API here, no reason to test the surrounding class.

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProviderTest.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadProvider.BigQuerySchemaTransformRead;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test for {@link BigQuerySchemaTransformReadProvider}. */
+@RunWith(JUnit4.class)
+public class BigQuerySchemaTransformReadProviderTest {
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
+  private static final TypeDescriptor<BigQuerySchemaTransformReadConfiguration> TYPE_DESCRIPTOR =
+      TypeDescriptor.of(BigQuerySchemaTransformReadConfiguration.class);
+  private static final Schema SCHEMA = AUTO_VALUE_SCHEMA.schemaFor(TYPE_DESCRIPTOR);
+  private static final SerializableFunction<BigQuerySchemaTransformReadConfiguration, Row>
+      ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+  @Test
+  public void testFromExtractConfiguration() {
+    BigQuerySchemaTransformReadConfiguration configuration =
+        BigQuerySchemaTransformReadConfiguration.createExtractBuilder("dataset.table").build();
+    Row configurationRow = ROW_SERIALIZABLE_FUNCTION.apply(configuration);
+    SchemaTransformProvider provider = new BigQuerySchemaTransformReadProvider();
+    BigQuerySchemaTransformRead transform =
+        (BigQuerySchemaTransformRead) provider.from(configurationRow);
+    assertEquals(configuration.getTableSpec(), transform.getConfiguration().getTableSpec());
+  }
+
+  @Test
+  public void testFromQueryConfiguration() {
+    BigQuerySchemaTransformReadConfiguration want =
+        BigQuerySchemaTransformReadConfiguration.createQueryBuilder("select * from example")
+            .build();
+    Row configurationRow = ROW_SERIALIZABLE_FUNCTION.apply(want);
+    SchemaTransformProvider provider = new BigQuerySchemaTransformReadProvider();
+    BigQuerySchemaTransformRead transform =
+        (BigQuerySchemaTransformRead) provider.from(configurationRow);
+    BigQuerySchemaTransformReadConfiguration got = transform.getConfiguration();
+    assertEquals(want.getQuery(), got.getQuery());
+    assertEquals(want.getUseStandardSql(), got.getUseStandardSql());
+  }
+
+  @Test
+  public void getConfiguration() {

Review comment:
       What are you trying to test here? The TypedSchemaTransform class has tests so we may not need this one if we are just testing in general that it works.
   
   Also, let's move schema generation to this test? The rest don't need it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] damondouglas commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
damondouglas commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r841097703



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery write jobs configured
+ * using {@link BigQuerySchemaTransformWriteConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformWriteConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String INPUT_TAG = "INPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformWriteConfiguration> configurationClass() {
+    return BigQuerySchemaTransformWriteConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformWriteConfiguration configuration) {
+    return new BigQueryWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since a
+   * single is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * no output is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery write jobs configured using {@link
+   * BigQuerySchemaTransformWriteConfiguration}.
+   */
+  static class BigQueryWriteSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformWriteConfiguration configuration;
+
+    BigQueryWriteSchemaTransform(BigQuerySchemaTransformWriteConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new BigQuerySchemaTransformWriteTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery write jobs configured using {@link
+   * BigQuerySchemaTransformWriteConfiguration}.
+   */
+  static class BigQuerySchemaTransformWriteTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+    private final BigQuerySchemaTransformWriteConfiguration configuration;
+
+    BigQuerySchemaTransformWriteTransform(BigQuerySchemaTransformWriteConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.has(INPUT_TAG)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s is missing expected tag: %s",
+                getClass().getSimpleName(), input.getClass().getSimpleName(), INPUT_TAG));
+      }
+      PCollection<Row> rowPCollection = input.get(INPUT_TAG);
+      Schema schema = rowPCollection.getSchema();
+      PCollection<TableRow> tableRowPCollection =
+          rowPCollection.apply(
+              MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow));
+      tableRowPCollection.apply(toWrite(schema));

Review comment:
       I will actually address checking for the schema in this PR.  I agree.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] damondouglas commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
damondouglas commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r841111018



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for reading from BigQuery.
+ *
+ * <p>This class is meant to be used with {@link BigQuerySchemaTransformReadProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaTransformReadConfiguration {

Review comment:
       Just uploading the state as-is now.  I will combine in future commits.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] damondouglas commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
damondouglas commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r834847151



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
##########
@@ -42,13 +42,13 @@
 @Experimental(Kind.SCHEMAS)
 public abstract class TypedSchemaTransformProvider<ConfigT> implements SchemaTransformProvider {
 
-  abstract Class<ConfigT> configurationClass();
+  protected abstract Class<ConfigT> configurationClass();

Review comment:
       This resolved the does-not-override method errors when extending this class.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/TypedSchemaTransformProvider.java
##########
@@ -42,13 +42,13 @@
 @Experimental(Kind.SCHEMAS)
 public abstract class TypedSchemaTransformProvider<ConfigT> implements SchemaTransformProvider {
 
-  abstract Class<ConfigT> configurationClass();
+  protected abstract Class<ConfigT> configurationClass();
 
   /**
    * Produce a SchemaTransform from ConfigT. Can throw a {@link InvalidConfigurationException} or a
    * {@link InvalidSchemaException}.
    */
-  abstract SchemaTransform from(ConfigT configuration);
+  protected abstract SchemaTransform from(ConfigT configuration);

Review comment:
       This resolved the does-not-override method errors when extending this class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] damondouglas commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
damondouglas commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r835592220



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+
+/**
+ * Configuration for reading from BigQuery.
+ *
+ * <p>This class is meant to be used with {@link BigQuerySchemaTransformReadProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaTransformReadConfiguration {
+
+  private static final boolean DEFAULT_USE_STANDARD_SQL = true;
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} from the SQL query.
+   *
+   * <p>The configuration defaults to useStandardSql=true.
+   */
+  public static Builder createQueryBuilder(String query) {
+    return defaultBuilder().setQuery(query).setJobType(JobType.QUERY);
+  }
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} to support BigQuery
+   * extract jobs. See the getTableSpec() getter for details.
+   */
+  public static Builder createExtractBuilder(String tableSpec) {
+    return defaultBuilder().setTableSpec(tableSpec).setJobType(JobType.EXTRACT);
+  }
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} to support BigQuery
+   * extract jobs.
+   */
+  public static Builder createExtractBuilder(TableReference tableSpec) {
+    if (tableSpec.getProjectId().isEmpty()) {
+      return createExtractBuilder(
+          String.format("%s.%s", tableSpec.getDatasetId(), tableSpec.getTableId()));
+    }
+    return createExtractBuilder(
+        String.format(
+            "%s:%s.%s",
+            tableSpec.getProjectId(), tableSpec.getDatasetId(), tableSpec.getTableId()));
+  }
+
+  private static Builder defaultBuilder() {
+    return new AutoValue_BigQuerySchemaTransformReadConfiguration.Builder()
+        .setJobType(JobType.UNSPECIFIED)
+        .setQuery("")

Review comment:
       @laraschmidt May we consider reverting back to defaulting to empty Strings?  Please see https://github.com/apache/beam/pull/17181/files#r835594560




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] damondouglas commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
damondouglas commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r835584091



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+
+/**
+ * Configuration for reading from BigQuery.
+ *
+ * <p>This class is meant to be used with {@link BigQuerySchemaTransformReadProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaTransformReadConfiguration {
+
+  private static final boolean DEFAULT_USE_STANDARD_SQL = true;
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} from the SQL query.
+   *
+   * <p>The configuration defaults to useStandardSql=true.
+   */
+  public static Builder createQueryBuilder(String query) {
+    return defaultBuilder().setQuery(query).setJobType(JobType.QUERY);
+  }
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} to support BigQuery
+   * extract jobs. See the getTableSpec() getter for details.
+   */
+  public static Builder createExtractBuilder(String tableSpec) {
+    return defaultBuilder().setTableSpec(tableSpec).setJobType(JobType.EXTRACT);
+  }
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} to support BigQuery
+   * extract jobs.
+   */
+  public static Builder createExtractBuilder(TableReference tableSpec) {
+    if (tableSpec.getProjectId().isEmpty()) {
+      return createExtractBuilder(
+          String.format("%s.%s", tableSpec.getDatasetId(), tableSpec.getTableId()));
+    }
+    return createExtractBuilder(
+        String.format(
+            "%s:%s.%s",

Review comment:
       https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.html#parseTableSpec-java.lang.String-  allows both `project:dataset.table` and `project.dataset.table`.  When seeing BigQuery dataset details in the Google Cloud console UI it displays the dataset ID in `project:dataset.table`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] damondouglas commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
damondouglas commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r841097589



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQueryReadSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQueryReadSchemaTransform(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+    private BigQueryServices testBigQueryServices = null;
+
+    PCollectionRowTupleTransform(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+      this.testBigQueryServices = testBigQueryServices;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+
+      BigQueryIO.TypedRead<TableRow> read = toTypedRead();
+      if (testBigQueryServices != null) {
+        read = read.withTestServices(testBigQueryServices).withoutValidation();

Review comment:
       I would like to have separate tests on the toTypedRead method.  I think this in combination with testing against FakeBigQueryServices will help with the test coverage.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] damondouglas commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
damondouglas commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r835592220



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+
+/**
+ * Configuration for reading from BigQuery.
+ *
+ * <p>This class is meant to be used with {@link BigQuerySchemaTransformReadProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaTransformReadConfiguration {
+
+  private static final boolean DEFAULT_USE_STANDARD_SQL = true;
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} from the SQL query.
+   *
+   * <p>The configuration defaults to useStandardSql=true.
+   */
+  public static Builder createQueryBuilder(String query) {
+    return defaultBuilder().setQuery(query).setJobType(JobType.QUERY);
+  }
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} to support BigQuery
+   * extract jobs. See the getTableSpec() getter for details.
+   */
+  public static Builder createExtractBuilder(String tableSpec) {
+    return defaultBuilder().setTableSpec(tableSpec).setJobType(JobType.EXTRACT);
+  }
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} to support BigQuery
+   * extract jobs.
+   */
+  public static Builder createExtractBuilder(TableReference tableSpec) {
+    if (tableSpec.getProjectId().isEmpty()) {
+      return createExtractBuilder(
+          String.format("%s.%s", tableSpec.getDatasetId(), tableSpec.getTableId()));
+    }
+    return createExtractBuilder(
+        String.format(
+            "%s:%s.%s",
+            tableSpec.getProjectId(), tableSpec.getDatasetId(), tableSpec.getTableId()));
+  }
+
+  private static Builder defaultBuilder() {
+    return new AutoValue_BigQuerySchemaTransformReadConfiguration.Builder()
+        .setJobType(JobType.UNSPECIFIED)
+        .setQuery("")

Review comment:
       @laraschmidt changing the parameters to nullable leads to this error in the Jenkins output.  When I used non-nullable and default to empty string, I didn't get this error:
   ```
   /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_PVR_Flink_Docker_Commit/src/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:155: error: [argument.type.incompatible] incompatible argument for parameter tableSpec of from.
   13:26:39       return BigQueryIO.readTableRowsWithSchema().from(configuration.getTableSpec());
   13:26:39                                                                                  ^
   13:26:39   found   : @Initialized @Nullable String
   13:26:39   required: @Initialized @NonNull String
   13:26:39 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_PVR_Flink_Docker_Commit/src/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:159: error: [argument.type.incompatible] incompatible argument for parameter arg0 of requireNonNull.
   13:26:39       String query = Objects.requireNonNull(configuration.getQuery());
   13:26:39                                                                   ^
   13:26:39   found   : @Initialized @Nullable String
   13:26:39   required: @Initialized @NonNull String
   13:26:39 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_PVR_Flink_Docker_Commit/src/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:164: error: [argument.type.incompatible] incompatible argument for parameter location of withQueryLocation.
   13:26:39         read = read.withQueryLocation(configuration.getQueryLocation());
   13:26:39                                                                     ^
   13:26:39   found   : @Initialized @Nullable String
   13:26:39   required: @Initialized @NonNull String
   13:26:39 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_PVR_Flink_Docker_Commit/src/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java:168: error: [condition.nullable] condition on a possibly-null value (configuration.getUseStandardSql())
   13:26:39         if (configuration.getUseStandardSql()) {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r835453436



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+
+/**
+ * Configuration for reading from BigQuery.
+ *
+ * <p>This class is meant to be used with {@link BigQuerySchemaTransformReadProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaTransformReadConfiguration {
+
+  private static final boolean DEFAULT_USE_STANDARD_SQL = true;
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} from the SQL query.
+   *
+   * <p>The configuration defaults to useStandardSql=true.
+   */
+  public static Builder createQueryBuilder(String query) {
+    return defaultBuilder().setQuery(query).setJobType(JobType.QUERY);
+  }
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} to support BigQuery
+   * extract jobs. See the getTableSpec() getter for details.
+   */
+  public static Builder createExtractBuilder(String tableSpec) {
+    return defaultBuilder().setTableSpec(tableSpec).setJobType(JobType.EXTRACT);
+  }
+
+  /**
+   * Instantiates a {@link BigQuerySchemaTransformReadConfiguration.Builder} to support BigQuery
+   * extract jobs.
+   */
+  public static Builder createExtractBuilder(TableReference tableSpec) {
+    if (tableSpec.getProjectId().isEmpty()) {
+      return createExtractBuilder(
+          String.format("%s.%s", tableSpec.getDatasetId(), tableSpec.getTableId()));
+    }
+    return createExtractBuilder(
+        String.format(
+            "%s:%s.%s",
+            tableSpec.getProjectId(), tableSpec.getDatasetId(), tableSpec.getTableId()));
+  }
+
+  private static Builder defaultBuilder() {
+    return new AutoValue_BigQuerySchemaTransformReadConfiguration.Builder()
+        .setJobType(JobType.UNSPECIFIED)
+        .setQuery("")
+        .setQueryLocation("")
+        .setTableSpec("")
+        .setUseStandardSql(DEFAULT_USE_STANDARD_SQL);
+  }
+
+  /** Configures the BigQuery job type. */
+  abstract JobType getJobType();
+
+  /** Configures the BigQuery read job with the SQL query. */
+  public abstract String getQuery();
+
+  /**
+   * Specifies a table for a BigQuery read job as "[project_id]:[dataset_id].[table_id]" or
+   * "[dataset_id].[table_id]" for tables within the current project.
+   */
+  public abstract String getTableSpec();
+
+  /** BigQuery geographic location where the query job will be executed. */
+  public abstract String getQueryLocation();
+
+  /** Enables BigQuery's Standard SQL dialect when reading from a query. */
+  public abstract Boolean getUseStandardSql();
+
+  /** Instantiates a {@link BigQueryIO.TypedRead} from the configuration. */
+  public BigQueryIO.TypedRead<TableRow> toTypedRead() {

Review comment:
       Would it make sense to put this logic all into the SchemaTransform and then just make this class a configuration class? I think having some of the logic split between them is a bit odd.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] damondouglas commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
damondouglas commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r835655245



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration configuration) {
+    return new BigQuerySchemaTransformRead(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQuerySchemaTransformRead implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQuerySchemaTransformRead(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new BigQuerySchemaTransformReadTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQuerySchemaTransformReadTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQuerySchemaTransformReadTransform(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+      PCollection<TableRow> tableRowPCollection = input.getPipeline().apply(toTypedRead());
+      Schema schema = tableRowPCollection.getSchema();
+      PCollection<Row> rowPCollection =
+          tableRowPCollection.apply(
+              MapElements.into(TypeDescriptor.of(Row.class))
+                  .via((tableRow) -> BigQueryUtils.toBeamRow(schema, tableRow)));
+      return PCollectionRowTuple.of(OUTPUT_TAG, rowPCollection);
+    }
+
+    BigQueryIO.TypedRead<TableRow> toTypedRead() {
+      JobType jobType = configuration.getJobType();
+      switch (jobType) {
+        case QUERY:
+          return toQueryTypedRead();
+
+        case EXTRACT:
+          return toExtractTypedRead();
+
+        default:
+          throw new InvalidConfigurationException(
+              String.format("invalid job type for BigQueryIO read, got: %s", jobType));
+      }
+    }
+
+    private BigQueryIO.TypedRead<TableRow> toExtractTypedRead() {
+      return BigQueryIO.readTableRowsWithSchema().from(configuration.getTableSpec());
+    }
+
+    private BigQueryIO.TypedRead<TableRow> toQueryTypedRead() {

Review comment:
       Would this error be resolved by adding a SuppressWarnings annotation related to https://issues.apache.org/jira/browse/BEAM-10402




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] damondouglas commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
damondouglas commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r841097415



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for reading from BigQuery.
+ *
+ * <p>This class is meant to be used with {@link BigQuerySchemaTransformReadProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaTransformReadConfiguration {

Review comment:
       Thank you, Lara :-).  I like this idea of combining.  I think it will make the code cleaner.  I will make this change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] laraschmidt commented on a change in pull request #17181: [BEAM-14035] Convert BigQuery SchemaIO to SchemaTransform

Posted by GitBox <gi...@apache.org>.
laraschmidt commented on a change in pull request #17181:
URL: https://github.com/apache/beam/pull/17181#discussion_r840795608



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for reading from BigQuery.
+ *
+ * <p>This class is meant to be used with {@link BigQuerySchemaTransformReadProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaTransformReadConfiguration {

Review comment:
       What are your thoughts on combining this into one class BigQuerySchemaTransformConfiguration {
   
   public class Read {}
   
   public class Write {}
   }
   Maybe Ankur has more thoughts but this might cut down on the amount of new files we require at least.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadConfiguration.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+
+/**
+ * Configuration for reading from BigQuery.
+ *
+ * <p>This class is meant to be used with {@link BigQuerySchemaTransformReadProvider}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class BigQuerySchemaTransformReadConfiguration {

Review comment:
       What are your thoughts on combining this into one class BigQuerySchemaTransformConfiguration {
   
   public class Read {}
   
   public class Write {}
   }
   Maybe Ankur has more thoughts but this might cut down on the amount of new files we require at least.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery write jobs configured
+ * using {@link BigQuerySchemaTransformWriteConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformWriteConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String INPUT_TAG = "INPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformWriteConfiguration> configurationClass() {
+    return BigQuerySchemaTransformWriteConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformWriteConfiguration configuration) {
+    return new BigQueryWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since a
+   * single is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * no output is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery write jobs configured using {@link
+   * BigQuerySchemaTransformWriteConfiguration}.
+   */
+  static class BigQueryWriteSchemaTransform implements SchemaTransform {

Review comment:
       BigQueryWriteSchemaTransform. It's a schema transform that does bigquery write.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQueryReadSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQueryReadSchemaTransform(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+    private BigQueryServices testBigQueryServices = null;
+
+    PCollectionRowTupleTransform(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+      this.testBigQueryServices = testBigQueryServices;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+
+      BigQueryIO.TypedRead<TableRow> read = toTypedRead();
+      if (testBigQueryServices != null) {
+        read = read.withTestServices(testBigQueryServices).withoutValidation();

Review comment:
       This still doesn't really test toTypedRead but if it's the best we can do then it's probably okay.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQueryReadSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQueryReadSchemaTransform(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+    private BigQueryServices testBigQueryServices = null;

Review comment:
       comments on this field

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQueryReadSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQueryReadSchemaTransform(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+    private BigQueryServices testBigQueryServices = null;

Review comment:
       comments on this field

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery write jobs configured
+ * using {@link BigQuerySchemaTransformWriteConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformWriteConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String INPUT_TAG = "INPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformWriteConfiguration> configurationClass() {
+    return BigQuerySchemaTransformWriteConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformWriteConfiguration configuration) {
+    return new BigQueryWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since a
+   * single is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * no output is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery write jobs configured using {@link
+   * BigQuerySchemaTransformWriteConfiguration}.
+   */
+  static class BigQueryWriteSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformWriteConfiguration configuration;
+
+    BigQueryWriteSchemaTransform(BigQuerySchemaTransformWriteConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new BigQuerySchemaTransformWriteTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery write jobs configured using {@link
+   * BigQuerySchemaTransformWriteConfiguration}.
+   */
+  static class BigQuerySchemaTransformWriteTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+    private final BigQuerySchemaTransformWriteConfiguration configuration;
+
+    BigQuerySchemaTransformWriteTransform(BigQuerySchemaTransformWriteConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.has(INPUT_TAG)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s is missing expected tag: %s",
+                getClass().getSimpleName(), input.getClass().getSimpleName(), INPUT_TAG));
+      }
+      PCollection<Row> rowPCollection = input.get(INPUT_TAG);
+      Schema schema = rowPCollection.getSchema();
+      PCollection<TableRow> tableRowPCollection =
+          rowPCollection.apply(
+              MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow));
+      tableRowPCollection.apply(toWrite(schema));

Review comment:
       So what happens if the input schema doesn't match the table (if it already exists). I assume we'd fail somewhere? If so, that's probably fine for now. But ideally we would do a pre-emptive check here that the schema of the table matches. Let's at least add a TODO for this.

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProviderTest.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadProvider.PCollectionRowTupleTransform;
+import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
+import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
+import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test for {@link BigQuerySchemaTransformReadProvider}. */
+@RunWith(JUnit4.class)
+public class BigQuerySchemaTransformReadProviderTest {
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
+  private static final TypeDescriptor<BigQuerySchemaTransformReadConfiguration> TYPE_DESCRIPTOR =
+      TypeDescriptor.of(BigQuerySchemaTransformReadConfiguration.class);
+  private static final SerializableFunction<BigQuerySchemaTransformReadConfiguration, Row>
+      ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+  private static final String FAKE_PROJECT = "fakeproject";
+  private static final String FAKE_DATASET = "fakedataset";
+  private static final String FAKE_TABLE_ID = "faketable";
+  // private static final String FAKE_QUERY = String.format("select * from `%s.%s.%s`", FAKE_PROJECT,
+  //     FAKE_DATASET, FAKE_TABLE_ID);
+
+  private static final TableReference FAKE_TABLE_REFERENCE = new TableReference()
+      .setProjectId(FAKE_PROJECT)
+      .setDatasetId(FAKE_DATASET)
+      .setTableId(FAKE_TABLE_ID);
+
+  private static final String FAKE_TABLE_SPEC = BigQueryHelpers.toTableSpec(FAKE_TABLE_REFERENCE);
+
+  private static final Schema FAKE_SCHEMA = Schema.of(
+      Field.of("name", FieldType.STRING),
+      Field.of("number", FieldType.INT64)
+  );
+
+  private static final List<TableRow> FAKE_RECORDS = Arrays.asList(
+      new TableRow().set("name", "a").set("number", 1L),
+      new TableRow().set("name", "b").set("number", 2L),
+      new TableRow().set("name", "c").set("number", 3L)
+  );
+
+  private static final List<Row> FAKE_ROWS = Arrays.asList(
+      Row.withSchema(FAKE_SCHEMA)
+          .withFieldValue("name", "a")
+          .withFieldValue("number", 1L)
+          .build(),
+      Row.withSchema(FAKE_SCHEMA)
+          .withFieldValue("name", "b")
+          .withFieldValue("number", 2L)
+          .build(),
+      Row.withSchema(FAKE_SCHEMA)
+          .withFieldValue("name", "c")
+          .withFieldValue("number", 3L)
+          .build()
+  );
+
+  private static final TableSchema FAKE_TABLE_SCHEMA = BigQueryUtils.toTableSchema(FAKE_SCHEMA);
+  private static final BigQueryOptions OPTIONS =
+      TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+  private final FakeDatasetService fakeDatasetService = new FakeDatasetService();
+  private final FakeJobService fakeJobService = new FakeJobService();
+  private final Table fakeTable = new Table();
+  private final TemporaryFolder temporaryFolder = new TemporaryFolder();
+  private final FakeBigQueryServices fakeBigQueryServices =
+      new FakeBigQueryServices()
+          .withJobService(fakeJobService)
+          .withDatasetService(fakeDatasetService);
+
+  @Before
+  public void setUp() throws IOException, InterruptedException {
+    FakeDatasetService.setUp();
+    FakeJobService.setUp();
+    BigQueryIO.clearCreatedTables();
+    fakeTable.setSchema(FAKE_TABLE_SCHEMA);
+    fakeTable.setTableReference(FAKE_TABLE_REFERENCE);
+    fakeTable.setNumBytes(1024L * 1024L);
+    fakeDatasetService.createDataset(FAKE_PROJECT, FAKE_DATASET, "", "", null);
+    fakeDatasetService.createTable(fakeTable);
+    fakeDatasetService.insertAll(fakeTable.getTableReference(), FAKE_RECORDS, null);
+    temporaryFolder.create();
+    OPTIONS.setProject(FAKE_PROJECT);
+    OPTIONS.setTempLocation(temporaryFolder.getRoot().getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() {
+    temporaryFolder.delete();
+  }
+
+  @Rule
+  public transient TestPipeline p = TestPipeline.fromOptions(OPTIONS);
+
+  @Test
+  public void testFromExtractConfiguration() {

Review comment:
       testExtract? also prob use read here instead of extract.

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProviderTest.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadProvider.PCollectionRowTupleTransform;
+import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
+import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
+import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test for {@link BigQuerySchemaTransformReadProvider}. */
+@RunWith(JUnit4.class)
+public class BigQuerySchemaTransformReadProviderTest {
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new AutoValueSchema();
+  private static final TypeDescriptor<BigQuerySchemaTransformReadConfiguration> TYPE_DESCRIPTOR =
+      TypeDescriptor.of(BigQuerySchemaTransformReadConfiguration.class);
+  private static final SerializableFunction<BigQuerySchemaTransformReadConfiguration, Row>
+      ROW_SERIALIZABLE_FUNCTION = AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+  private static final String FAKE_PROJECT = "fakeproject";
+  private static final String FAKE_DATASET = "fakedataset";
+  private static final String FAKE_TABLE_ID = "faketable";
+  // private static final String FAKE_QUERY = String.format("select * from `%s.%s.%s`", FAKE_PROJECT,
+  //     FAKE_DATASET, FAKE_TABLE_ID);
+
+  private static final TableReference FAKE_TABLE_REFERENCE = new TableReference()
+      .setProjectId(FAKE_PROJECT)
+      .setDatasetId(FAKE_DATASET)
+      .setTableId(FAKE_TABLE_ID);
+
+  private static final String FAKE_TABLE_SPEC = BigQueryHelpers.toTableSpec(FAKE_TABLE_REFERENCE);
+
+  private static final Schema FAKE_SCHEMA = Schema.of(
+      Field.of("name", FieldType.STRING),
+      Field.of("number", FieldType.INT64)
+  );
+
+  private static final List<TableRow> FAKE_RECORDS = Arrays.asList(
+      new TableRow().set("name", "a").set("number", 1L),
+      new TableRow().set("name", "b").set("number", 2L),
+      new TableRow().set("name", "c").set("number", 3L)
+  );
+
+  private static final List<Row> FAKE_ROWS = Arrays.asList(
+      Row.withSchema(FAKE_SCHEMA)
+          .withFieldValue("name", "a")
+          .withFieldValue("number", 1L)
+          .build(),
+      Row.withSchema(FAKE_SCHEMA)
+          .withFieldValue("name", "b")
+          .withFieldValue("number", 2L)
+          .build(),
+      Row.withSchema(FAKE_SCHEMA)
+          .withFieldValue("name", "c")
+          .withFieldValue("number", 3L)
+          .build()
+  );
+
+  private static final TableSchema FAKE_TABLE_SCHEMA = BigQueryUtils.toTableSchema(FAKE_SCHEMA);
+  private static final BigQueryOptions OPTIONS =
+      TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+  private final FakeDatasetService fakeDatasetService = new FakeDatasetService();
+  private final FakeJobService fakeJobService = new FakeJobService();
+  private final Table fakeTable = new Table();
+  private final TemporaryFolder temporaryFolder = new TemporaryFolder();
+  private final FakeBigQueryServices fakeBigQueryServices =
+      new FakeBigQueryServices()
+          .withJobService(fakeJobService)
+          .withDatasetService(fakeDatasetService);
+
+  @Before
+  public void setUp() throws IOException, InterruptedException {
+    FakeDatasetService.setUp();
+    FakeJobService.setUp();
+    BigQueryIO.clearCreatedTables();
+    fakeTable.setSchema(FAKE_TABLE_SCHEMA);
+    fakeTable.setTableReference(FAKE_TABLE_REFERENCE);
+    fakeTable.setNumBytes(1024L * 1024L);
+    fakeDatasetService.createDataset(FAKE_PROJECT, FAKE_DATASET, "", "", null);
+    fakeDatasetService.createTable(fakeTable);
+    fakeDatasetService.insertAll(fakeTable.getTableReference(), FAKE_RECORDS, null);
+    temporaryFolder.create();
+    OPTIONS.setProject(FAKE_PROJECT);
+    OPTIONS.setTempLocation(temporaryFolder.getRoot().getAbsolutePath());
+  }
+
+  @After
+  public void tearDown() {
+    temporaryFolder.delete();
+  }
+
+  @Rule
+  public transient TestPipeline p = TestPipeline.fromOptions(OPTIONS);
+
+  @Test
+  public void testFromExtractConfiguration() {

Review comment:
       testExtract?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQueryReadSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQueryReadSchemaTransform(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+    private BigQueryServices testBigQueryServices = null;
+
+    PCollectionRowTupleTransform(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+      this.testBigQueryServices = testBigQueryServices;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+
+      BigQueryIO.TypedRead<TableRow> read = toTypedRead();
+      if (testBigQueryServices != null) {
+        read = read.withTestServices(testBigQueryServices).withoutValidation();

Review comment:
       We should at least manually test it works in a pipeline though. 

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery write jobs configured
+ * using {@link BigQuerySchemaTransformWriteConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformWriteConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String INPUT_TAG = "INPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformWriteConfiguration> configurationClass() {
+    return BigQuerySchemaTransformWriteConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformWriteConfiguration configuration) {
+    return new BigQueryWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since a
+   * single is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * no output is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery write jobs configured using {@link
+   * BigQuerySchemaTransformWriteConfiguration}.
+   */
+  static class BigQueryWriteSchemaTransform implements SchemaTransform {

Review comment:
       BigQueryWriteSchemaTransform. It's a schema transform that does bigquery write.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQueryReadSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQueryReadSchemaTransform(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+    private BigQueryServices testBigQueryServices = null;
+
+    PCollectionRowTupleTransform(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+      this.testBigQueryServices = testBigQueryServices;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+
+      BigQueryIO.TypedRead<TableRow> read = toTypedRead();
+      if (testBigQueryServices != null) {
+        read = read.withTestServices(testBigQueryServices).withoutValidation();

Review comment:
       This still doesn't really test toTypedRead but if it's the best we can do then it's probably okay.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformReadProvider.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySchemaTransformReadConfiguration.JobType;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery read jobs configured using
+ * {@link BigQuerySchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformReadProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformReadConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformReadConfiguration> configurationClass() {
+    return BigQuerySchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformReadConfiguration configuration) {
+    return new BigQueryReadSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class BigQueryReadSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+
+    BigQueryReadSchemaTransform(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new PCollectionRowTupleTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery read jobs configured using {@link
+   * BigQuerySchemaTransformReadConfiguration}.
+   */
+  static class PCollectionRowTupleTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+
+    private final BigQuerySchemaTransformReadConfiguration configuration;
+    private BigQueryServices testBigQueryServices = null;
+
+    PCollectionRowTupleTransform(BigQuerySchemaTransformReadConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    void setTestBigQueryServices(BigQueryServices testBigQueryServices) {
+      this.testBigQueryServices = testBigQueryServices;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+
+      BigQueryIO.TypedRead<TableRow> read = toTypedRead();
+      if (testBigQueryServices != null) {
+        read = read.withTestServices(testBigQueryServices).withoutValidation();

Review comment:
       We should at least manually test it works in a pipeline though. 

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaTransformWriteProvider.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for BigQuery write jobs configured
+ * using {@link BigQuerySchemaTransformWriteConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
+ * repository.
+ */
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class BigQuerySchemaTransformWriteProvider
+    extends TypedSchemaTransformProvider<BigQuerySchemaTransformWriteConfiguration> {
+
+  private static final String API = "bigquery";
+  private static final String VERSION = "v2";
+  private static final String INPUT_TAG = "INPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<BigQuerySchemaTransformWriteConfiguration> configurationClass() {
+    return BigQuerySchemaTransformWriteConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(BigQuerySchemaTransformWriteConfiguration configuration) {
+    return new BigQueryWriteSchemaTransform(configuration);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:%s", API, VERSION);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} inputCollectionNames method. Since a
+   * single is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.singletonList(INPUT_TAG);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} outputCollectionNames method. Since
+   * no output is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for BigQuery write jobs configured using {@link
+   * BigQuerySchemaTransformWriteConfiguration}.
+   */
+  static class BigQueryWriteSchemaTransform implements SchemaTransform {
+    private final BigQuerySchemaTransformWriteConfiguration configuration;
+
+    BigQueryWriteSchemaTransform(BigQuerySchemaTransformWriteConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> buildTransform() {
+      return new BigQuerySchemaTransformWriteTransform(configuration);
+    }
+  }
+
+  /**
+   * An implementation of {@link PTransform} for BigQuery write jobs configured using {@link
+   * BigQuerySchemaTransformWriteConfiguration}.
+   */
+  static class BigQuerySchemaTransformWriteTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
+    private final BigQuerySchemaTransformWriteConfiguration configuration;
+
+    BigQuerySchemaTransformWriteTransform(BigQuerySchemaTransformWriteConfiguration configuration) {
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.has(INPUT_TAG)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s is missing expected tag: %s",
+                getClass().getSimpleName(), input.getClass().getSimpleName(), INPUT_TAG));
+      }
+      PCollection<Row> rowPCollection = input.get(INPUT_TAG);
+      Schema schema = rowPCollection.getSchema();
+      PCollection<TableRow> tableRowPCollection =
+          rowPCollection.apply(
+              MapElements.into(TypeDescriptor.of(TableRow.class)).via(BigQueryUtils::toTableRow));
+      tableRowPCollection.apply(toWrite(schema));

Review comment:
       So what happens if the input schema doesn't match the table (if it already exists). I assume we'd fail somewhere? If so, that's probably fine for now. But ideally we would do a pre-emptive check here that the schema of the table matches. Let's at least add a TODO for this. Unless the old one only did create? Or did it use an existing table ever?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org