You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2019/06/07 20:42:09 UTC
[beam] branch master updated: Merge pull request #8620: [BEAM-6673]
Add schema support to BigQuery reads
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b24e82c Merge pull request #8620: [BEAM-6673] Add schema support to BigQuery reads
b24e82c is described below
commit b24e82cc40e732ea6cff023650cc2b83cf14f32a
Author: Charith Ellawala <ch...@users.noreply.github.com>
AuthorDate: Fri Jun 7 21:41:52 2019 +0100
Merge pull request #8620: [BEAM-6673] Add schema support to BigQuery reads
---
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 91 ++++++++++---
.../sdk/io/gcp/bigquery/BigQueryQuerySource.java | 103 ++------------
...uerySource.java => BigQueryQuerySourceDef.java} | 107 +++++++--------
.../bigquery/BigQuerySchemaRetrievalException.java | 25 ++++
.../sdk/io/gcp/bigquery/BigQuerySourceDef.java | 52 ++++++++
.../sdk/io/gcp/bigquery/BigQueryTableSource.java | 61 ++-------
...ableSource.java => BigQueryTableSourceDef.java} | 88 +++++-------
.../beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 131 +++++++++++++++++-
.../sdk/io/gcp/bigquery/BigQueryIOReadTest.java | 148 +++++++++++++--------
.../sdk/io/gcp/bigquery/BigQueryUtilsTest.java | 109 +++++++++++++++
10 files changed, 598 insertions(+), 317 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index a66c903..5e6e59f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -83,6 +83,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
@@ -100,6 +101,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
@@ -405,6 +407,14 @@ public class BigQueryIO {
return read(new TableRowParser()).withCoder(TableRowJsonCoder.of());
}
+ /** Like {@link #readTableRows()} but with {@link Schema} support. */
+ public static TypedRead<TableRow> readTableRowsWithSchema() {
+ return read(new TableRowParser())
+ .withCoder(TableRowJsonCoder.of())
+ .withBeamRowConverters(
+ BigQueryUtils.tableRowToBeamRow(), BigQueryUtils.tableRowFromBeamRow());
+ }
+
/**
* Reads from a BigQuery table or query and returns a {@link PCollection} with one element per
* each row of the table or query result, parsed from the BigQuery AVRO format using the specified
@@ -593,6 +603,12 @@ public class BigQueryIO {
DIRECT_READ,
}
+ interface ToBeamRowFunction<T>
+ extends SerializableFunction<Schema, SerializableFunction<T, Row>> {}
+
+ interface FromBeamRowFunction<T>
+ extends SerializableFunction<Schema, SerializableFunction<Row, T>> {}
+
abstract Builder<T> toBuilder();
@AutoValue.Builder
@@ -628,6 +644,12 @@ public class BigQueryIO {
abstract Builder<T> setCoder(Coder<T> coder);
abstract Builder<T> setKmsKey(String kmsKey);
+
+ @Experimental(Experimental.Kind.SCHEMAS)
+ abstract Builder<T> setToBeamRowFn(ToBeamRowFunction<T> toRowFn);
+
+ @Experimental(Experimental.Kind.SCHEMAS)
+ abstract Builder<T> setFromBeamRowFn(FromBeamRowFunction<T> fromRowFn);
}
@Nullable
@@ -669,6 +691,14 @@ public class BigQueryIO {
@Nullable
abstract String getKmsKey();
+ @Nullable
+ @Experimental(Experimental.Kind.SCHEMAS)
+ abstract ToBeamRowFunction<T> getToBeamRowFn();
+
+ @Nullable
+ @Experimental(Experimental.Kind.SCHEMAS)
+ abstract FromBeamRowFunction<T> getFromBeamRowFn();
+
/**
* An enumeration type for the priority of a query.
*
@@ -709,27 +739,22 @@ public class BigQueryIO {
}
}
- private BigQuerySourceBase<T> createSource(String jobUuid, Coder<T> coder) {
- BigQuerySourceBase<T> source;
+ private BigQuerySourceDef createSourceDef() {
+ BigQuerySourceDef sourceDef;
if (getQuery() == null) {
- source =
- BigQueryTableSource.create(
- jobUuid, getTableProvider(), getBigQueryServices(), coder, getParseFn());
+ sourceDef = BigQueryTableSourceDef.create(getBigQueryServices(), getTableProvider());
} else {
- source =
- BigQueryQuerySource.create(
- jobUuid,
+ sourceDef =
+ BigQueryQuerySourceDef.create(
+ getBigQueryServices(),
getQuery(),
getFlattenResults(),
getUseLegacySql(),
- getBigQueryServices(),
- coder,
- getParseFn(),
MoreObjects.firstNonNull(getQueryPriority(), QueryPriority.BATCH),
getQueryLocation(),
getKmsKey());
}
- return source;
+ return sourceDef;
}
private BigQueryStorageQuerySource<T> createStorageQuerySource(
@@ -840,6 +865,12 @@ public class BigQueryIO {
}
checkArgument(getParseFn() != null, "A parseFn is required");
+ // if both toRowFn and fromRowFn values are set, enable Beam schema support
+ boolean beamSchemaEnabled = false;
+ if (getToBeamRowFn() != null && getFromBeamRowFn() != null) {
+ beamSchemaEnabled = true;
+ }
+
Pipeline p = input.getPipeline();
final Coder<T> coder = inferCoder(p.getCoderRegistry());
@@ -852,6 +883,7 @@ public class BigQueryIO {
"Invalid BigQueryIO.Read: Specifies table read options, "
+ "which only applies when using Method.DIRECT_READ");
+ final BigQuerySourceDef sourceDef = createSourceDef();
final PCollectionView<String> jobIdTokenView;
PCollection<String> jobIdTokenCollection;
PCollection<T> rows;
@@ -862,7 +894,10 @@ public class BigQueryIO {
p.apply("TriggerIdCreation", Create.of(staticJobUuid))
.apply("ViewId", View.asSingleton());
// Apply the traditional Source model.
- rows = p.apply(org.apache.beam.sdk.io.Read.from(createSource(staticJobUuid, coder)));
+ rows =
+ p.apply(
+ org.apache.beam.sdk.io.Read.from(
+ sourceDef.toSource(staticJobUuid, coder, getParseFn())));
} else {
// Create a singleton job ID token at execution time.
jobIdTokenCollection =
@@ -888,7 +923,8 @@ public class BigQueryIO {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String jobUuid = c.element();
- BigQuerySourceBase<T> source = createSource(jobUuid, coder);
+ BigQuerySourceBase<T> source =
+ sourceDef.toSource(jobUuid, coder, getParseFn());
BigQueryOptions options =
c.getPipelineOptions().as(BigQueryOptions.class);
ExtractResult res = source.extractFiles(options);
@@ -919,7 +955,8 @@ public class BigQueryIO {
BigQueryHelpers.fromJsonString(
c.sideInput(schemaView), TableSchema.class);
String jobUuid = c.sideInput(jobIdTokenView);
- BigQuerySourceBase<T> source = createSource(jobUuid, coder);
+ BigQuerySourceBase<T> source =
+ sourceDef.toSource(jobUuid, coder, getParseFn());
List<BoundedSource<T>> sources =
source.createSources(
ImmutableList.of(
@@ -966,7 +1003,18 @@ public class BigQueryIO {
}
}
};
- return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView));
+
+ rows = rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView));
+
+ if (beamSchemaEnabled) {
+ BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class);
+ Schema beamSchema = sourceDef.getBeamSchema(bqOptions);
+ SerializableFunction<T, Row> toBeamRow = getToBeamRowFn().apply(beamSchema);
+ SerializableFunction<Row, T> fromBeamRow = getFromBeamRowFn().apply(beamSchema);
+
+ rows.setSchema(beamSchema, toBeamRow, fromBeamRow);
+ }
+ return rows;
}
private PCollection<T> expandForDirectRead(PBegin input, Coder<T> outputCoder) {
@@ -1201,6 +1249,17 @@ public class BigQueryIO {
return toBuilder().setKmsKey(kmsKey).build();
}
+ /**
+ * Sets the functions to convert elements to/from {@link Row} objects.
+ *
+ * <p>Setting these conversion functions is necessary to enable {@link Schema} support.
+ */
+ @Experimental(Experimental.Kind.SCHEMAS)
+ public TypedRead<T> withBeamRowConverters(
+ ToBeamRowFunction<T> toRowFn, FromBeamRowFunction<T> fromRowFn) {
+ return toBuilder().setToBeamRowFn(toRowFn).setFromBeamRowFn(fromRowFn).build();
+ }
+
/** See {@link Read#from(String)}. */
public TypedRead<T> from(String tableSpec) {
return from(StaticValueProvider.of(tableSpec));
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index 375cc4f..f2a70da 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -17,20 +17,10 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
-import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
-import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createTempTableReference;
-import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.TableReference;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
@@ -45,115 +35,44 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
static <T> BigQueryQuerySource<T> create(
String stepUuid,
- ValueProvider<String> query,
- Boolean flattenResults,
- Boolean useLegacySql,
+ BigQueryQuerySourceDef queryDef,
BigQueryServices bqServices,
Coder<T> coder,
- SerializableFunction<SchemaAndRecord, T> parseFn,
- QueryPriority priority,
- String location,
- String kmsKey) {
- return new BigQueryQuerySource<>(
- stepUuid,
- query,
- flattenResults,
- useLegacySql,
- bqServices,
- coder,
- parseFn,
- priority,
- location,
- kmsKey);
+ SerializableFunction<SchemaAndRecord, T> parseFn) {
+ return new BigQueryQuerySource<>(stepUuid, queryDef, bqServices, coder, parseFn);
}
- private final ValueProvider<String> query;
- private final Boolean flattenResults;
- private final Boolean useLegacySql;
- private final QueryPriority priority;
- private final String location;
- private final String kmsKey;
-
- private transient AtomicReference<JobStatistics> dryRunJobStats;
+ private final BigQueryQuerySourceDef queryDef;
private BigQueryQuerySource(
String stepUuid,
- ValueProvider<String> query,
- Boolean flattenResults,
- Boolean useLegacySql,
+ BigQueryQuerySourceDef queryDef,
BigQueryServices bqServices,
Coder<T> coder,
- SerializableFunction<SchemaAndRecord, T> parseFn,
- QueryPriority priority,
- String location,
- String kmsKey) {
+ SerializableFunction<SchemaAndRecord, T> parseFn) {
super(stepUuid, bqServices, coder, parseFn);
- this.query = checkNotNull(query, "query");
- this.flattenResults = checkNotNull(flattenResults, "flattenResults");
- this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql");
- this.priority = priority;
- this.location = location;
- this.kmsKey = kmsKey;
- dryRunJobStats = new AtomicReference<>();
- }
-
- /**
- * Since the query helper reference is declared as transient, neither the AtomicReference nor the
- * structure it refers to are persisted across serialization boundaries. The code below is
- * resilient to the QueryHelper object disappearing in between method calls, but the reference
- * object must be recreated at deserialization time.
- */
- private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
- in.defaultReadObject();
- dryRunJobStats = new AtomicReference<>();
+ this.queryDef = queryDef;
}
@Override
public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- return BigQueryQueryHelper.dryRunQueryIfNeeded(
- bqServices,
- options.as(BigQueryOptions.class),
- dryRunJobStats,
- query.get(),
- flattenResults,
- useLegacySql,
- location)
- .getQuery()
- .getTotalBytesProcessed();
+ return queryDef.getEstimatedSizeBytes(options.as(BigQueryOptions.class));
}
@Override
protected TableReference getTableToExtract(BigQueryOptions bqOptions)
throws IOException, InterruptedException {
- return BigQueryQueryHelper.executeQuery(
- bqServices,
- bqOptions,
- dryRunJobStats,
- stepUuid,
- query.get(),
- flattenResults,
- useLegacySql,
- priority,
- location,
- kmsKey);
+ return queryDef.getTableReference(bqOptions, stepUuid);
}
@Override
protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
- TableReference tableToRemove =
- createTempTableReference(
- bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), stepUuid));
-
- DatasetService tableService = bqServices.getDatasetService(bqOptions);
- LOG.info("Deleting temporary table with query results {}", tableToRemove);
- tableService.deleteTable(tableToRemove);
- LOG.info("Deleting temporary dataset with query results {}", tableToRemove.getDatasetId());
- tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId());
+ queryDef.cleanupTempResource(bqOptions, stepUuid);
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- builder.add(DisplayData.item("query", query));
+ builder.add(DisplayData.item("query", queryDef.getQuery()));
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
similarity index 67%
copy from sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
copy to sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
index 375cc4f..1f2366f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
@@ -23,74 +23,54 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** A {@link BigQuerySourceBase} for querying BigQuery tables. */
-@VisibleForTesting
-class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
-
- private static final Logger LOG = LoggerFactory.getLogger(BigQueryQuerySource.class);
-
- static <T> BigQueryQuerySource<T> create(
- String stepUuid,
- ValueProvider<String> query,
- Boolean flattenResults,
- Boolean useLegacySql,
- BigQueryServices bqServices,
- Coder<T> coder,
- SerializableFunction<SchemaAndRecord, T> parseFn,
- QueryPriority priority,
- String location,
- String kmsKey) {
- return new BigQueryQuerySource<>(
- stepUuid,
- query,
- flattenResults,
- useLegacySql,
- bqServices,
- coder,
- parseFn,
- priority,
- location,
- kmsKey);
- }
+class BigQueryQuerySourceDef implements BigQuerySourceDef {
+ private static final Logger LOG = LoggerFactory.getLogger(BigQueryQuerySourceDef.class);
+ private final BigQueryServices bqServices;
private final ValueProvider<String> query;
private final Boolean flattenResults;
private final Boolean useLegacySql;
- private final QueryPriority priority;
+ private final BigQueryIO.TypedRead.QueryPriority priority;
private final String location;
private final String kmsKey;
private transient AtomicReference<JobStatistics> dryRunJobStats;
- private BigQueryQuerySource(
- String stepUuid,
+ static BigQueryQuerySourceDef create(
+ BigQueryServices bqServices,
ValueProvider<String> query,
Boolean flattenResults,
Boolean useLegacySql,
+ BigQueryIO.TypedRead.QueryPriority priority,
+ String location,
+ String kmsKey) {
+ return new BigQueryQuerySourceDef(
+ bqServices, query, flattenResults, useLegacySql, priority, location, kmsKey);
+ }
+
+ private BigQueryQuerySourceDef(
BigQueryServices bqServices,
- Coder<T> coder,
- SerializableFunction<SchemaAndRecord, T> parseFn,
- QueryPriority priority,
+ ValueProvider<String> query,
+ Boolean flattenResults,
+ Boolean useLegacySql,
+ BigQueryIO.TypedRead.QueryPriority priority,
String location,
String kmsKey) {
- super(stepUuid, bqServices, coder, parseFn);
this.query = checkNotNull(query, "query");
this.flattenResults = checkNotNull(flattenResults, "flattenResults");
this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql");
+ this.bqServices = bqServices;
this.priority = priority;
this.location = location;
this.kmsKey = kmsKey;
@@ -108,11 +88,10 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
dryRunJobStats = new AtomicReference<>();
}
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ long getEstimatedSizeBytes(BigQueryOptions bqOptions) throws Exception {
return BigQueryQueryHelper.dryRunQueryIfNeeded(
bqServices,
- options.as(BigQueryOptions.class),
+ bqOptions,
dryRunJobStats,
query.get(),
flattenResults,
@@ -122,8 +101,7 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
.getTotalBytesProcessed();
}
- @Override
- protected TableReference getTableToExtract(BigQueryOptions bqOptions)
+ TableReference getTableReference(BigQueryOptions bqOptions, String stepUuid)
throws IOException, InterruptedException {
return BigQueryQueryHelper.executeQuery(
bqServices,
@@ -138,22 +116,47 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
kmsKey);
}
- @Override
- protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
+ void cleanupTempResource(BigQueryOptions bqOptions, String stepUuid) throws Exception {
TableReference tableToRemove =
createTempTableReference(
bqOptions.getProject(), createJobIdToken(bqOptions.getJobName(), stepUuid));
- DatasetService tableService = bqServices.getDatasetService(bqOptions);
+ BigQueryServices.DatasetService tableService = bqServices.getDatasetService(bqOptions);
LOG.info("Deleting temporary table with query results {}", tableToRemove);
tableService.deleteTable(tableToRemove);
LOG.info("Deleting temporary dataset with query results {}", tableToRemove.getDatasetId());
tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId());
}
+ /** {@inheritDoc} */
+ @Override
+ public <T> BigQuerySourceBase<T> toSource(
+ String stepUuid, Coder<T> coder, SerializableFunction<SchemaAndRecord, T> parseFn) {
+ return BigQueryQuerySource.create(stepUuid, this, bqServices, coder, parseFn);
+ }
+
+ /** {@inheritDoc} */
@Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder.add(DisplayData.item("query", query));
+ public Schema getBeamSchema(BigQueryOptions bqOptions) {
+ try {
+ JobStatistics stats =
+ BigQueryQueryHelper.dryRunQueryIfNeeded(
+ bqServices,
+ bqOptions,
+ dryRunJobStats,
+ query.get(),
+ flattenResults,
+ useLegacySql,
+ location);
+ TableSchema tableSchema = stats.getQuery().getSchema();
+ return BigQueryUtils.fromTableSchema(tableSchema);
+ } catch (IOException | InterruptedException | NullPointerException e) {
+ throw new BigQuerySchemaRetrievalException(
+ "Exception while trying to retrieve schema of query", e);
+ }
+ }
+
+ ValueProvider<String> getQuery() {
+ return query;
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaRetrievalException.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaRetrievalException.java
new file mode 100644
index 0000000..2736e56
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySchemaRetrievalException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+/** Exception to signal that BigQuery schema retrieval failed. */
+public class BigQuerySchemaRetrievalException extends RuntimeException {
+ BigQuerySchemaRetrievalException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
new file mode 100644
index 0000000..0f3de1d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+
+/**
+ * Represents a source used for {@link BigQueryIO#read(SerializableFunction)}. Currently this could
+ * be either a table or a query. Direct read sources are not yet supported.
+ */
+interface BigQuerySourceDef extends Serializable {
+ /**
+ * Convert this source definition into a concrete source implementation.
+ *
+ * @param stepUuid Job UUID
+ * @param coder Coder
+ * @param parseFn Parse function
+ * @param <T> Type of the resulting PCollection
+ * @return An implementation of {@link BigQuerySourceBase}
+ */
+ <T> BigQuerySourceBase<T> toSource(
+ String stepUuid, Coder<T> coder, SerializableFunction<SchemaAndRecord, T> parseFn);
+
+ /**
+ * Extract the Beam {@link Schema} corresponding to this source.
+ *
+ * @param bqOptions BigQueryOptions
+ * @return Beam schema of the source
+ * @throws BigQuerySchemaRetrievalException if schema retrieval fails
+ */
+ @Experimental(Experimental.Kind.SCHEMAS)
+ Schema getBeamSchema(BigQueryOptions bqOptions);
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index f8ea5e1..4334f7e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -17,22 +17,15 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
-import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
-
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,69 +36,41 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
static <T> BigQueryTableSource<T> create(
String stepUuid,
- ValueProvider<TableReference> table,
+ BigQueryTableSourceDef tableDef,
BigQueryServices bqServices,
Coder<T> coder,
SerializableFunction<SchemaAndRecord, T> parseFn) {
- return new BigQueryTableSource<>(stepUuid, table, bqServices, coder, parseFn);
+ return new BigQueryTableSource<>(stepUuid, tableDef, bqServices, coder, parseFn);
}
- private final ValueProvider<String> jsonTable;
+ private final BigQueryTableSourceDef tableDef;
private final AtomicReference<Long> tableSizeBytes;
private BigQueryTableSource(
String stepUuid,
- ValueProvider<TableReference> table,
+ BigQueryTableSourceDef tableDef,
BigQueryServices bqServices,
Coder<T> coder,
SerializableFunction<SchemaAndRecord, T> parseFn) {
super(stepUuid, bqServices, coder, parseFn);
- this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new TableRefToJson());
+ this.tableDef = tableDef;
this.tableSizeBytes = new AtomicReference<>();
}
@Override
protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException {
- TableReference tableReference =
- BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class);
- return setDefaultProjectIfAbsent(bqOptions, tableReference);
- }
-
- /**
- * Sets the {@link TableReference#projectId} of the provided table reference to the id of the
- * default project if the table reference does not have a project ID specified.
- */
- private TableReference setDefaultProjectIfAbsent(
- BigQueryOptions bqOptions, TableReference tableReference) {
- if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
- checkState(
- !Strings.isNullOrEmpty(bqOptions.getProject()),
- "No project ID set in %s or %s, cannot construct a complete %s",
- TableReference.class.getSimpleName(),
- BigQueryOptions.class.getSimpleName(),
- TableReference.class.getSimpleName());
- LOG.info(
- "Project ID not set in {}. Using default project from {}.",
- TableReference.class.getSimpleName(),
- BigQueryOptions.class.getSimpleName());
- tableReference.setProjectId(bqOptions.getProject());
- }
- return tableReference;
+ return tableDef.getTableReference(bqOptions);
}
@Override
public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
if (tableSizeBytes.get() == null) {
- TableReference table =
- setDefaultProjectIfAbsent(
- options.as(BigQueryOptions.class),
- BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class));
-
- Table tableRef =
- bqServices.getDatasetService(options.as(BigQueryOptions.class)).getTable(table);
- Long numBytes = tableRef.getNumBytes();
- if (tableRef.getStreamingBuffer() != null) {
- numBytes += tableRef.getStreamingBuffer().getEstimatedBytes().longValue();
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ TableReference tableRef = tableDef.getTableReference(bqOptions);
+ Table table = bqServices.getDatasetService(bqOptions).getTable(tableRef);
+ Long numBytes = table.getNumBytes();
+ if (table.getStreamingBuffer() != null) {
+ numBytes += table.getStreamingBuffer().getEstimatedBytes().longValue();
}
tableSizeBytes.compareAndSet(null, numBytes);
@@ -121,6 +86,6 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- builder.add(DisplayData.item("table", jsonTable));
+ builder.add(DisplayData.item("table", tableDef.getJsonTable()));
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
similarity index 52%
copy from sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
copy to sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
index f8ea5e1..07159af 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
@@ -20,52 +20,37 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
-import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** A {@link BigQuerySourceBase} for reading BigQuery tables. */
-@VisibleForTesting
-class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
- private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableSource.class);
-
- static <T> BigQueryTableSource<T> create(
- String stepUuid,
- ValueProvider<TableReference> table,
- BigQueryServices bqServices,
- Coder<T> coder,
- SerializableFunction<SchemaAndRecord, T> parseFn) {
- return new BigQueryTableSource<>(stepUuid, table, bqServices, coder, parseFn);
- }
+class BigQueryTableSourceDef implements BigQuerySourceDef {
+ private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableSourceDef.class);
+ private final BigQueryServices bqServices;
private final ValueProvider<String> jsonTable;
- private final AtomicReference<Long> tableSizeBytes;
- private BigQueryTableSource(
- String stepUuid,
- ValueProvider<TableReference> table,
- BigQueryServices bqServices,
- Coder<T> coder,
- SerializableFunction<SchemaAndRecord, T> parseFn) {
- super(stepUuid, bqServices, coder, parseFn);
- this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new TableRefToJson());
- this.tableSizeBytes = new AtomicReference<>();
+ static BigQueryTableSourceDef create(
+ BigQueryServices bqServices, ValueProvider<TableReference> table) {
+ ValueProvider<String> jsonTable =
+ ValueProvider.NestedValueProvider.of(
+ checkNotNull(table, "table"), new BigQueryHelpers.TableRefToJson());
+ return new BigQueryTableSourceDef(bqServices, jsonTable);
}
- @Override
- protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException {
+ private BigQueryTableSourceDef(BigQueryServices bqServices, ValueProvider<String> jsonTable) {
+ this.bqServices = bqServices;
+ this.jsonTable = jsonTable;
+ }
+
+ TableReference getTableReference(BigQueryOptions bqOptions) throws IOException {
TableReference tableReference =
BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class);
return setDefaultProjectIfAbsent(bqOptions, tableReference);
@@ -93,34 +78,27 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
return tableReference;
}
- @Override
- public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- if (tableSizeBytes.get() == null) {
- TableReference table =
- setDefaultProjectIfAbsent(
- options.as(BigQueryOptions.class),
- BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class));
-
- Table tableRef =
- bqServices.getDatasetService(options.as(BigQueryOptions.class)).getTable(table);
- Long numBytes = tableRef.getNumBytes();
- if (tableRef.getStreamingBuffer() != null) {
- numBytes += tableRef.getStreamingBuffer().getEstimatedBytes().longValue();
- }
-
- tableSizeBytes.compareAndSet(null, numBytes);
- }
- return tableSizeBytes.get();
+ ValueProvider<String> getJsonTable() {
+ return jsonTable;
}
+ /** {@inheritDoc} */
@Override
- protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
- // Do nothing.
+ public <T> BigQuerySourceBase<T> toSource(
+ String stepUuid, Coder<T> coder, SerializableFunction<SchemaAndRecord, T> parseFn) {
+ return BigQueryTableSource.create(stepUuid, this, bqServices, coder, parseFn);
}
+ /** {@inheritDoc} */
@Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder.add(DisplayData.item("table", jsonTable));
+ public Schema getBeamSchema(BigQueryOptions bqOptions) {
+ try {
+ TableReference tableRef = getTableReference(bqOptions);
+ TableSchema tableSchema =
+ bqServices.getDatasetService(bqOptions).getTable(tableRef).getSchema();
+ return BigQueryUtils.fromTableSchema(tableSchema);
+ } catch (IOException | InterruptedException | NullPointerException e) {
+ throw new BigQuerySchemaRetrievalException("Exception while trying to retrieve schema", e);
+ }
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index bd1fda3..1a87875 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -31,10 +31,12 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.schemas.LogicalTypes;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
@@ -130,7 +132,7 @@ public class BigQueryUtils {
.put("SqlDateType", StandardSQLTypeName.DATE)
.put("SqlTimeType", StandardSQLTypeName.TIME)
.put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME)
- .put("SqlTimestampWithLocalTzType", StandardSQLTypeName.TIMESTAMP)
+ .put("SqlTimestampWithLocalTzType", StandardSQLTypeName.DATETIME)
.put("SqlCharType", StandardSQLTypeName.STRING)
.build();
@@ -149,6 +151,79 @@ public class BigQueryUtils {
return BEAM_TO_BIGQUERY_TYPE_MAPPING.get(fieldType.getTypeName());
}
+ /**
+ * Get the Beam {@link FieldType} from a BigQuery type name.
+ *
+ * <p>Supports both standard and legacy SQL types.
+ *
+ * @param typeName Name of the type
+ * @param nestedFields Nested fields for the given type (eg. RECORD type)
+ * @return Corresponding Beam {@link FieldType}
+ */
+ private static FieldType fromTableFieldSchemaType(
+ String typeName, List<TableFieldSchema> nestedFields) {
+ switch (typeName) {
+ case "STRING":
+ return FieldType.STRING;
+ case "BYTES":
+ return FieldType.BYTES;
+ case "INT64":
+ case "INTEGER":
+ return FieldType.INT64;
+ case "FLOAT64":
+ case "FLOAT":
+ return FieldType.DOUBLE;
+ case "BOOL":
+ case "BOOLEAN":
+ return FieldType.BOOLEAN;
+ case "TIMESTAMP":
+ return FieldType.DATETIME;
+ case "TIME":
+ return FieldType.logicalType(
+ new LogicalTypes.PassThroughLogicalType<Instant>(
+ "SqlTimeType", "", FieldType.DATETIME) {});
+ case "DATE":
+ return FieldType.logicalType(
+ new LogicalTypes.PassThroughLogicalType<Instant>(
+ "SqlDateType", "", FieldType.DATETIME) {});
+ case "DATETIME":
+ return FieldType.logicalType(
+ new LogicalTypes.PassThroughLogicalType<Instant>(
+ "SqlTimestampWithLocalTzType", "", FieldType.DATETIME) {});
+ case "STRUCT":
+ case "RECORD":
+ Schema rowSchema = fromTableFieldSchema(nestedFields);
+ return FieldType.row(rowSchema);
+ default:
+ throw new UnsupportedOperationException(
+ "Converting BigQuery type " + typeName + " to Beam type is unsupported");
+ }
+ }
+
+ private static Schema fromTableFieldSchema(List<TableFieldSchema> tableFieldSchemas) {
+ Schema.Builder schemaBuilder = Schema.builder();
+ for (TableFieldSchema tableFieldSchema : tableFieldSchemas) {
+ FieldType fieldType =
+ fromTableFieldSchemaType(tableFieldSchema.getType(), tableFieldSchema.getFields());
+
+ Optional<Mode> fieldMode = Optional.ofNullable(tableFieldSchema.getMode()).map(Mode::valueOf);
+ if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()) {
+ fieldType = FieldType.array(fieldType);
+ }
+
+ // if the mode is not defined or if it is set to NULLABLE, then the field is nullable
+ boolean nullable =
+ !fieldMode.isPresent() || fieldMode.filter(m -> m == Mode.NULLABLE).isPresent();
+ Field field = Field.of(tableFieldSchema.getName(), fieldType).withNullable(nullable);
+ if (tableFieldSchema.getDescription() != null
+ && !"".equals(tableFieldSchema.getDescription())) {
+ field = field.withDescription(tableFieldSchema.getDescription());
+ }
+ schemaBuilder.addField(field);
+ }
+ return schemaBuilder.build();
+ }
+
private static List<TableFieldSchema> toTableFieldSchema(Schema schema) {
List<TableFieldSchema> fields = new ArrayList<>(schema.getFieldCount());
for (Field schemaField : schema.getFields()) {
@@ -188,6 +263,25 @@ public class BigQueryUtils {
return new TableSchema().setFields(toTableFieldSchema(schema));
}
+ /** Convert a BigQuery {@link TableSchema} to a Beam {@link Schema}. */
+ public static Schema fromTableSchema(TableSchema tableSchema) {
+ return fromTableFieldSchema(tableSchema.getFields());
+ }
+
+ private static final BigQueryIO.TypedRead.ToBeamRowFunction<TableRow>
+ TABLE_ROW_TO_BEAM_ROW_FUNCTION = beamSchema -> (TableRow tr) -> toBeamRow(beamSchema, tr);
+
+ public static final BigQueryIO.TypedRead.ToBeamRowFunction<TableRow> tableRowToBeamRow() {
+ return TABLE_ROW_TO_BEAM_ROW_FUNCTION;
+ }
+
+ private static final BigQueryIO.TypedRead.FromBeamRowFunction<TableRow>
+ TABLE_ROW_FROM_BEAM_ROW_FUNCTION = ignored -> BigQueryUtils::toTableRow;
+
+ public static final BigQueryIO.TypedRead.FromBeamRowFunction<TableRow> tableRowFromBeamRow() {
+ return TABLE_ROW_FROM_BEAM_ROW_FUNCTION;
+ }
+
private static final SerializableFunction<Row, TableRow> ROW_TO_TABLE_ROW =
new ToTableRow(SerializableFunctions.identity());
@@ -288,6 +382,36 @@ public class BigQueryUtils {
}
/**
+ * Tries to convert a JSON {@link TableRow} from BigQuery into a Beam {@link Row}.
+ *
+ * <p>Only supports basic types and arrays. Doesn't support date types or structs.
+ */
+ public static Row toBeamRow(Schema rowSchema, TableRow jsonBqRow) {
+ // TODO deprecate toBeamRow(Schema, TableSchema, TableRow) function in favour of this function.
+ // This function attempts to convert TableRows without having access to the
+ // corresponding TableSchema because:
+ // 1. TableSchema contains redundant information already available in the Schema object.
+ // 2. TableSchema objects are not serializable and are therefore harder to propagate through a
+ // pipeline.
+ return rowSchema.getFields().stream()
+ .map(field -> toBeamRowFieldValue(field, jsonBqRow.get(field.getName())))
+ .collect(toRow(rowSchema));
+ }
+
+ private static Object toBeamRowFieldValue(Field field, Object bqValue) {
+ if (bqValue == null) {
+ if (field.getType().getNullable()) {
+ return null;
+ } else {
+ throw new IllegalArgumentException(
+ "Received null value for non-nullable field " + field.getName());
+ }
+ }
+
+ return toBeamValue(field.getType(), bqValue);
+ }
+
+ /**
* Tries to parse the JSON {@link TableRow} from BigQuery.
*
* <p>Only supports basic types and arrays. Doesn't support date types.
@@ -320,11 +444,14 @@ public class BigQueryUtils {
if (jsonBQValue instanceof List) {
return ((List<Object>) jsonBQValue)
.stream()
- .map(v -> ((Map<String, Object>) v).get("v"))
.map(v -> toBeamValue(fieldType.getCollectionElementType(), v))
.collect(toList());
}
+ if (jsonBQValue instanceof TableRow) {
+ return toBeamRow(fieldType.getRowSchema(), (TableRow) jsonBQValue);
+ }
+
throw new UnsupportedOperationException(
"Converting BigQuery type '"
+ jsonBQValue.getClass()
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index 883bbdd..c74a0d6 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -51,6 +51,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.Select;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
@@ -65,6 +67,7 @@ import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
@@ -405,6 +408,65 @@ public class BigQueryIOReadTest implements Serializable {
}
@Test
+ public void testReadTableWithSchema() throws IOException, InterruptedException {
+ // setup
+ Table someTable = new Table();
+ someTable.setSchema(
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("name").setType("STRING"),
+ new TableFieldSchema().setName("number").setType("INTEGER"))));
+ someTable.setTableReference(
+ new TableReference()
+ .setProjectId("non-executing-project")
+ .setDatasetId("schema_dataset")
+ .setTableId("schema_table"));
+ someTable.setNumBytes(1024L * 1024L);
+ FakeDatasetService fakeDatasetService = new FakeDatasetService();
+ fakeDatasetService.createDataset("non-executing-project", "schema_dataset", "", "", null);
+ fakeDatasetService.createTable(someTable);
+
+ List<TableRow> records =
+ Lists.newArrayList(
+ new TableRow().set("name", "a").set("number", 1L),
+ new TableRow().set("name", "b").set("number", 2L),
+ new TableRow().set("name", "c").set("number", 3L));
+
+ fakeDatasetService.insertAll(someTable.getTableReference(), records, null);
+
+ FakeBigQueryServices fakeBqServices =
+ new FakeBigQueryServices()
+ .withJobService(new FakeJobService())
+ .withDatasetService(fakeDatasetService);
+
+ // test
+ BigQueryIO.TypedRead<TableRow> read =
+ BigQueryIO.readTableRowsWithSchema()
+ .from("non-executing-project:schema_dataset.schema_table")
+ .withTestServices(fakeBqServices)
+ .withoutValidation();
+
+ PCollection<TableRow> bqRows = p.apply(read);
+
+ Schema expectedSchema =
+ Schema.of(
+ Schema.Field.of("name", Schema.FieldType.STRING).withNullable(true),
+ Schema.Field.of("number", Schema.FieldType.INT64).withNullable(true));
+ assertEquals(expectedSchema, bqRows.getSchema());
+
+ PCollection<Row> output = bqRows.apply(Select.fieldNames("name", "number"));
+ PAssert.that(output)
+ .containsInAnyOrder(
+ ImmutableList.of(
+ Row.withSchema(expectedSchema).addValues("a", 1L).build(),
+ Row.withSchema(expectedSchema).addValues("b", 2L).build(),
+ Row.withSchema(expectedSchema).addValues("c", 3L).build()));
+
+ p.run();
+ }
+
+ @Test
public void testBuildSourceDisplayDataTable() {
String tableSpec = "project:dataset.tableid";
@@ -509,12 +571,8 @@ public class BigQueryIOReadTest implements Serializable {
String stepUuid = "testStepUuid";
BoundedSource<TableRow> bqSource =
- BigQueryTableSource.create(
- stepUuid,
- ValueProvider.StaticValueProvider.of(table),
- fakeBqServices,
- TableRowJsonCoder.of(),
- BigQueryIO.TableRowParser.INSTANCE);
+ BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table))
+ .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
PipelineOptions options = PipelineOptionsFactory.create();
options.setTempLocation(testFolder.getRoot().getAbsolutePath());
@@ -562,12 +620,8 @@ public class BigQueryIOReadTest implements Serializable {
String stepUuid = "testStepUuid";
BoundedSource<TableRow> bqSource =
- BigQueryTableSource.create(
- stepUuid,
- ValueProvider.StaticValueProvider.of(table),
- fakeBqServices,
- TableRowJsonCoder.of(),
- BigQueryIO.TableRowParser.INSTANCE);
+ BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table))
+ .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
PipelineOptions options = PipelineOptionsFactory.create();
assertEquals(108, bqSource.getEstimatedSizeBytes(options));
@@ -600,12 +654,8 @@ public class BigQueryIOReadTest implements Serializable {
String stepUuid = "testStepUuid";
BoundedSource<TableRow> bqSource =
- BigQueryTableSource.create(
- stepUuid,
- ValueProvider.StaticValueProvider.of(table),
- fakeBqServices,
- TableRowJsonCoder.of(),
- BigQueryIO.TableRowParser.INSTANCE);
+ BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table))
+ .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
PipelineOptions options = PipelineOptionsFactory.create();
assertEquals(118, bqSource.getEstimatedSizeBytes(options));
@@ -621,18 +671,16 @@ public class BigQueryIOReadTest implements Serializable {
bqOptions.setProject("project");
String stepUuid = "testStepUuid";
- BigQueryQuerySource<TableRow> bqSource =
- BigQueryQuerySource.create(
- stepUuid,
- ValueProvider.StaticValueProvider.of(queryString),
- true /* flattenResults */,
- true /* useLegacySql */,
- fakeBqServices,
- TableRowJsonCoder.of(),
- BigQueryIO.TableRowParser.INSTANCE,
- QueryPriority.BATCH,
- null,
- null);
+ BigQuerySourceBase<TableRow> bqSource =
+ BigQueryQuerySourceDef.create(
+ fakeBqServices,
+ ValueProvider.StaticValueProvider.of(queryString),
+ true, /* flattenResults */
+ true, /* useLegacySql */
+ QueryPriority.BATCH,
+ null,
+ null)
+ .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
fakeJobService.expectDryRunQuery(
bqOptions.getProject(),
@@ -697,17 +745,15 @@ public class BigQueryIOReadTest implements Serializable {
.setReferencedTables(ImmutableList.of(sourceTableRef, tempTableReference))));
BoundedSource<TableRow> bqSource =
- BigQueryQuerySource.create(
- stepUuid,
- ValueProvider.StaticValueProvider.of(encodedQuery),
- true /* flattenResults */,
- true /* useLegacySql */,
- fakeBqServices,
- TableRowJsonCoder.of(),
- BigQueryIO.TableRowParser.INSTANCE,
- QueryPriority.BATCH,
- null,
- null);
+ BigQueryQuerySourceDef.create(
+ fakeBqServices,
+ ValueProvider.StaticValueProvider.of(encodedQuery),
+ true /* flattenResults */,
+ true /* useLegacySql */,
+ QueryPriority.BATCH,
+ null,
+ null)
+ .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
options.setTempLocation(testFolder.getRoot().getAbsolutePath());
@@ -764,17 +810,15 @@ public class BigQueryIOReadTest implements Serializable {
.setReferencedTables(ImmutableList.of())));
BoundedSource<TableRow> bqSource =
- BigQueryQuerySource.create(
- stepUuid,
- ValueProvider.StaticValueProvider.of(encodedQuery),
- true /* flattenResults */,
- true /* useLegacySql */,
- fakeBqServices,
- TableRowJsonCoder.of(),
- BigQueryIO.TableRowParser.INSTANCE,
- QueryPriority.BATCH,
- null,
- null);
+ BigQueryQuerySourceDef.create(
+ fakeBqServices,
+ ValueProvider.StaticValueProvider.of(encodedQuery),
+ true /* flattenResults */,
+ true /* useLegacySql */,
+ QueryPriority.BATCH,
+ null,
+ null)
+ .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
options.setTempLocation(testFolder.getRoot().getAbsolutePath());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index a23a3ec..3315598 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.collection.IsMapContaining.hasEntry;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertThrows;
@@ -31,12 +32,14 @@ import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions.TruncateTimestamps;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;
import org.joda.time.Instant;
+import org.joda.time.chrono.ISOChronology;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -83,22 +86,73 @@ public class BigQueryUtilsTest {
.setType(StandardSQLTypeName.INT64.toString())
.setMode(Mode.REPEATED.toString());
+ private static final TableFieldSchema ROW =
+ new TableFieldSchema()
+ .setName("row")
+ .setType(StandardSQLTypeName.STRUCT.toString())
+ .setMode(Mode.NULLABLE.toString())
+ .setFields(Arrays.asList(ID, VALUE, NAME, TIMESTAMP, VALID));
+
+ private static final TableFieldSchema ROWS =
+ new TableFieldSchema()
+ .setName("rows")
+ .setType(StandardSQLTypeName.STRUCT.toString())
+ .setMode(Mode.REPEATED.toString())
+ .setFields(Arrays.asList(ID, VALUE, NAME, TIMESTAMP, VALID));
+
private static final Row FLAT_ROW =
Row.withSchema(FLAT_TYPE)
.addValues(123L, 123.456, "test", new DateTime(123456), false)
.build();
+ private static final TableRow BQ_FLAT_ROW =
+ new TableRow()
+ .set("id", "123")
+ .set("value", "123.456")
+ .set("name", "test")
+ .set(
+ "timestamp",
+ String.valueOf(
+ new DateTime(123456L, ISOChronology.getInstanceUTC()).getMillis() / 1000.0D))
+ .set("valid", "false");
+
private static final Row NULL_FLAT_ROW =
Row.withSchema(FLAT_TYPE).addValues(null, null, null, null, null).build();
+ private static final TableRow BQ_NULL_FLAT_ROW =
+ new TableRow()
+ .set("id", null)
+ .set("value", null)
+ .set("name", null)
+ .set("timestamp", null)
+ .set("valid", null);
+
private static final Row ARRAY_ROW =
Row.withSchema(ARRAY_TYPE).addValues((Object) Arrays.asList(123L, 124L)).build();
+ private static final TableRow BQ_ARRAY_ROW =
+ new TableRow().set("ids", Arrays.asList("123", "124"));
+
private static final Row ROW_ROW = Row.withSchema(ROW_TYPE).addValues(FLAT_ROW).build();
+ private static final TableRow BQ_ROW_ROW = new TableRow().set("row", BQ_FLAT_ROW);
+
private static final Row ARRAY_ROW_ROW =
Row.withSchema(ARRAY_ROW_TYPE).addValues((Object) Arrays.asList(FLAT_ROW)).build();
+ private static final TableRow BQ_ARRAY_ROW_ROW =
+ new TableRow().set("rows", Collections.singletonList(BQ_FLAT_ROW));
+
+ private static final TableSchema BQ_FLAT_TYPE =
+ new TableSchema().setFields(Arrays.asList(ID, VALUE, NAME, TIMESTAMP, VALID));
+
+ private static final TableSchema BQ_ARRAY_TYPE = new TableSchema().setFields(Arrays.asList(IDS));
+
+ private static final TableSchema BQ_ROW_TYPE = new TableSchema().setFields(Arrays.asList(ROW));
+
+ private static final TableSchema BQ_ARRAY_ROW_TYPE =
+ new TableSchema().setFields(Arrays.asList(ROWS));
+
@Test
public void testToTableSchema_flat() {
TableSchema schema = toTableSchema(FLAT_TYPE);
@@ -140,6 +194,7 @@ public class BigQueryUtilsTest {
@Test
public void testToTableRow_flat() {
TableRow row = toTableRow().apply(FLAT_ROW);
+ System.out.println(row);
assertThat(row.size(), equalTo(5));
assertThat(row, hasEntry("id", "123"));
@@ -290,4 +345,58 @@ public class BigQueryUtilsTest {
return base.getMillis();
}
}
+
+ @Test
+ public void testFromTableSchema_flat() {
+ Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_FLAT_TYPE);
+ assertEquals(FLAT_TYPE, beamSchema);
+ }
+
+ @Test
+ public void testFromTableSchema_array() {
+ Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_ARRAY_TYPE);
+ assertEquals(ARRAY_TYPE, beamSchema);
+ }
+
+ @Test
+ public void testFromTableSchema_row() {
+ Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_ROW_TYPE);
+ assertEquals(ROW_TYPE, beamSchema);
+ }
+
+ @Test
+ public void testFromTableSchema_array_row() {
+ Schema beamSchema = BigQueryUtils.fromTableSchema(BQ_ARRAY_ROW_TYPE);
+ assertEquals(ARRAY_ROW_TYPE, beamSchema);
+ }
+
+ @Test
+ public void testToBeamRow_flat() {
+ Row beamRow = BigQueryUtils.toBeamRow(FLAT_TYPE, BQ_FLAT_ROW);
+ assertEquals(FLAT_ROW, beamRow);
+ }
+
+ @Test
+ public void testToBeamRow_null() {
+ Row beamRow = BigQueryUtils.toBeamRow(FLAT_TYPE, BQ_NULL_FLAT_ROW);
+ assertEquals(NULL_FLAT_ROW, beamRow);
+ }
+
+ @Test
+ public void testToBeamRow_array() {
+ Row beamRow = BigQueryUtils.toBeamRow(ARRAY_TYPE, BQ_ARRAY_ROW);
+ assertEquals(ARRAY_ROW, beamRow);
+ }
+
+ @Test
+ public void testToBeamRow_row() {
+ Row beamRow = BigQueryUtils.toBeamRow(ROW_TYPE, BQ_ROW_ROW);
+ assertEquals(ROW_ROW, beamRow);
+ }
+
+ @Test
+ public void testToBeamRow_array_row() {
+ Row beamRow = BigQueryUtils.toBeamRow(ARRAY_ROW_TYPE, BQ_ARRAY_ROW_ROW);
+ assertEquals(ARRAY_ROW_ROW, beamRow);
+ }
}