You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by sn...@apache.org on 2022/10/06 16:42:19 UTC
[beam] branch master updated: Support custom avro DatumReader when reading from BigQuery (#22718)
This is an automated email from the ASF dual-hosted git repository.
sniemitz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8bdf3579150 Support custom avro DatumReader when reading from BigQuery (#22718)
8bdf3579150 is described below
commit 8bdf35791507505f698c97502a075f0e5822a2b0
Author: Kanishk Karanawat <kk...@gmail.com>
AuthorDate: Thu Oct 6 12:42:07 2022 -0400
Support custom avro DatumReader when reading from BigQuery (#22718)
Co-authored-by: Kanishk Karanawat <kk...@twitter.com>
---
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 125 ++++++++++++++++++-
.../sdk/io/gcp/bigquery/BigQueryQuerySource.java | 10 +-
.../io/gcp/bigquery/BigQueryQuerySourceDef.java | 5 +-
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 44 +++----
.../sdk/io/gcp/bigquery/BigQuerySourceDef.java | 6 +-
.../sdk/io/gcp/bigquery/BigQueryTableSource.java | 10 +-
.../io/gcp/bigquery/BigQueryTableSourceDef.java | 5 +-
.../sdk/io/gcp/bigquery/BigQueryIOReadTest.java | 132 ++++++++++++++++++++-
8 files changed, 282 insertions(+), 55 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 024dcb053b5..760609988d5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -45,6 +45,7 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -53,9 +54,12 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Decoder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
@@ -69,6 +73,7 @@ import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
+import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MoveOptions;
@@ -122,10 +127,13 @@ import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
@@ -573,6 +581,57 @@ public class BigQueryIO {
BigQueryUtils.tableRowFromBeamRow());
}
+ private static class TableSchemaFunction
+ implements Serializable, Function<@Nullable String, @Nullable TableSchema> {
+ @Override
+ public @Nullable TableSchema apply(@Nullable String input) {
+ return BigQueryHelpers.fromJsonString(input, TableSchema.class);
+ }
+ }
+
+ @VisibleForTesting
+ static class GenericDatumTransformer<T> implements DatumReader<T> {
+ private final SerializableFunction<SchemaAndRecord, T> parseFn;
+ private final Supplier<TableSchema> tableSchema;
+ private GenericDatumReader<T> reader;
+ private org.apache.avro.Schema writerSchema;
+ private org.apache.avro.Schema readerSchema;
+
+ public GenericDatumTransformer(
+ SerializableFunction<SchemaAndRecord, T> parseFn,
+ String tableSchema,
+ org.apache.avro.Schema writer,
+ org.apache.avro.Schema reader) {
+ this.parseFn = parseFn;
+ this.tableSchema =
+ Suppliers.memoize(
+ Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(tableSchema)));
+ this.writerSchema = writer;
+ this.readerSchema = reader;
+ this.reader = new GenericDatumReader<>(this.writerSchema, this.readerSchema);
+ }
+
+ @Override
+ public void setSchema(org.apache.avro.Schema schema) {
+ if (this.writerSchema.equals(schema)) {
+ return;
+ }
+
+ this.writerSchema = schema;
+ if (this.readerSchema == null) {
+ this.readerSchema = schema;
+ }
+
+ this.reader = new GenericDatumReader<>(this.writerSchema, this.readerSchema);
+ }
+
+ @Override
+ public T read(T reuse, Decoder in) throws IOException {
+ GenericRecord record = (GenericRecord) this.reader.read(reuse, in);
+ return parseFn.apply(new SchemaAndRecord(record, this.tableSchema.get()));
+ }
+ }
+
/**
* Reads from a BigQuery table or query and returns a {@link PCollection} with one element per
* each row of the table or query result, parsed from the BigQuery AVRO format using the specified
@@ -598,6 +657,21 @@ public class BigQueryIO {
.setValidate(true)
.setWithTemplateCompatibility(false)
.setBigQueryServices(new BigQueryServicesImpl())
+ .setDatumReaderFactory(
+ (SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>>)
+ input -> {
+ try {
+ String jsonTableSchema = BigQueryIO.JSON_FACTORY.toString(input);
+ return (AvroSource.DatumReaderFactory<T>)
+ (writer, reader) ->
+ new GenericDatumTransformer<>(parseFn, jsonTableSchema, writer, reader);
+ } catch (IOException e) {
+ LOG.warn(
+ String.format("Error while converting table schema %s to JSON!", input), e);
+ return null;
+ }
+ })
+ // TODO: Remove setParseFn once https://github.com/apache/beam/issues/21076 is fixed.
.setParseFn(parseFn)
.setMethod(TypedRead.Method.DEFAULT)
.setUseAvroLogicalTypes(false)
@@ -606,6 +680,34 @@ public class BigQueryIO {
.build();
}
+ /**
+ * Reads from a BigQuery table or query and returns a {@link PCollection} with one element per
+ * each row of the table or query result. This API directly deserializes BigQuery AVRO data to the
+ * input class, based on the appropriate {@link org.apache.avro.io.DatumReader}.
+ *
+ * <pre>{@code
+ * class ClickEvent { long userId; String url; ... }
+ *
+ * p.apply(BigQueryIO.read(ClickEvent.class)).from("...")
+ * .read((AvroSource.DatumReaderFactory<ClickEvent>) (writer, reader) -> new ReflectDatumReader<>(ReflectData.get().getSchema(ClickEvent.class)));
+ * }</pre>
+ */
+ public static <T> TypedRead<T> readWithDatumReader(
+ AvroSource.DatumReaderFactory<T> readerFactory) {
+ return new AutoValue_BigQueryIO_TypedRead.Builder<T>()
+ .setValidate(true)
+ .setWithTemplateCompatibility(false)
+ .setBigQueryServices(new BigQueryServicesImpl())
+ .setDatumReaderFactory(
+ (SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>>)
+ input -> readerFactory)
+ .setMethod(TypedRead.Method.DEFAULT)
+ .setUseAvroLogicalTypes(false)
+ .setFormat(DataFormat.AVRO)
+ .setProjectionPushdownApplied(false)
+ .build();
+ }
+
@VisibleForTesting
static class TableRowParser implements SerializableFunction<SchemaAndRecord, TableRow> {
@@ -804,6 +906,9 @@ public class BigQueryIO {
abstract Builder<T> setParseFn(SerializableFunction<SchemaAndRecord, T> parseFn);
+ abstract Builder<T> setDatumReaderFactory(
+ SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> factoryFn);
+
abstract Builder<T> setCoder(Coder<T> coder);
abstract Builder<T> setKmsKey(String kmsKey);
@@ -836,7 +941,10 @@ public class BigQueryIO {
abstract BigQueryServices getBigQueryServices();
- abstract SerializableFunction<SchemaAndRecord, T> getParseFn();
+ abstract @Nullable SerializableFunction<SchemaAndRecord, T> getParseFn();
+
+ abstract @Nullable SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>>
+ getDatumReaderFactory();
abstract @Nullable QueryPriority getQueryPriority();
@@ -1065,7 +1173,8 @@ public class BigQueryIO {
getFlattenResults() != null, "flattenResults should not be null if query is set");
checkArgument(getUseLegacySql() != null, "useLegacySql should not be null if query is set");
}
- checkArgument(getParseFn() != null, "A parseFn is required");
+
+ checkArgument(getDatumReaderFactory() != null, "A readerDatumFactory is required");
// if both toRowFn and fromRowFn values are set, enable Beam schema support
Pipeline p = input.getPipeline();
@@ -1108,7 +1217,7 @@ public class BigQueryIO {
p.apply(
org.apache.beam.sdk.io.Read.from(
sourceDef.toSource(
- staticJobUuid, coder, getParseFn(), getUseAvroLogicalTypes())));
+ staticJobUuid, coder, getDatumReaderFactory(), getUseAvroLogicalTypes())));
} else {
// Create a singleton job ID token at execution time.
jobIdTokenCollection =
@@ -1136,7 +1245,10 @@ public class BigQueryIO {
String jobUuid = c.element();
BigQuerySourceBase<T> source =
sourceDef.toSource(
- jobUuid, coder, getParseFn(), getUseAvroLogicalTypes());
+ jobUuid,
+ coder,
+ getDatumReaderFactory(),
+ getUseAvroLogicalTypes());
BigQueryOptions options =
c.getPipelineOptions().as(BigQueryOptions.class);
ExtractResult res = source.extractFiles(options);
@@ -1169,7 +1281,10 @@ public class BigQueryIO {
String jobUuid = c.sideInput(jobIdTokenView);
BigQuerySourceBase<T> source =
sourceDef.toSource(
- jobUuid, coder, getParseFn(), getUseAvroLogicalTypes());
+ jobUuid,
+ coder,
+ getDatumReaderFactory(),
+ getUseAvroLogicalTypes());
List<BoundedSource<T>> sources =
source.createSources(
ImmutableList.of(
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index 4ca2c647d03..8e3b437bec5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -18,8 +18,10 @@
package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -34,10 +36,10 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
BigQueryQuerySourceDef queryDef,
BigQueryServices bqServices,
Coder<T> coder,
- SerializableFunction<SchemaAndRecord, T> parseFn,
+ SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory,
boolean useAvroLogicalTypes) {
return new BigQueryQuerySource<>(
- stepUuid, queryDef, bqServices, coder, parseFn, useAvroLogicalTypes);
+ stepUuid, queryDef, bqServices, coder, readerFactory, useAvroLogicalTypes);
}
private final BigQueryQuerySourceDef queryDef;
@@ -47,9 +49,9 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
BigQueryQuerySourceDef queryDef,
BigQueryServices bqServices,
Coder<T> coder,
- SerializableFunction<SchemaAndRecord, T> parseFn,
+ SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory,
boolean useAvroLogicalTypes) {
- super(stepUuid, bqServices, coder, parseFn, useAvroLogicalTypes);
+ super(stepUuid, bqServices, coder, readerFactory, useAvroLogicalTypes);
this.queryDef = queryDef;
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
index 1d06c819ccf..606d1d3ad88 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySourceDef.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.JobType;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.Schema;
@@ -153,10 +154,10 @@ class BigQueryQuerySourceDef implements BigQuerySourceDef {
public <T> BigQuerySourceBase<T> toSource(
String stepUuid,
Coder<T> coder,
- SerializableFunction<SchemaAndRecord, T> parseFn,
+ SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory,
boolean useAvroLogicalTypes) {
return BigQueryQuerySource.create(
- stepUuid, this, bqServices, coder, parseFn, useAvroLogicalTypes);
+ stepUuid, this, bqServices, coder, readerFactory, useAvroLogicalTypes);
}
/** {@inheritDoc} */
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index 18beda5c1c6..1f16fb44049 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -28,9 +28,7 @@ import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
-import java.io.Serializable;
import java.util.List;
-import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.BoundedSource;
@@ -41,9 +39,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.JobType;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -74,7 +69,7 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
protected final BigQueryServices bqServices;
private transient @Nullable List<BoundedSource<T>> cachedSplitResult = null;
- private SerializableFunction<SchemaAndRecord, T> parseFn;
+ private SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory;
private Coder<T> coder;
private final boolean useAvroLogicalTypes;
@@ -82,12 +77,12 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
String stepUuid,
BigQueryServices bqServices,
Coder<T> coder,
- SerializableFunction<SchemaAndRecord, T> parseFn,
+ SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory,
boolean useAvroLogicalTypes) {
this.stepUuid = checkArgumentNotNull(stepUuid, "stepUuid");
this.bqServices = checkArgumentNotNull(bqServices, "bqServices");
this.coder = checkArgumentNotNull(coder, "coder");
- this.parseFn = checkArgumentNotNull(parseFn, "parseFn");
+ this.readerFactory = checkArgumentNotNull(readerFactory, "readerFactory");
this.useAvroLogicalTypes = useAvroLogicalTypes;
}
@@ -239,41 +234,30 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
return BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob);
}
- private static class TableSchemaFunction
- implements Serializable, Function<@Nullable String, @Nullable TableSchema> {
- @Override
- public @Nullable TableSchema apply(@Nullable String input) {
- return BigQueryHelpers.fromJsonString(input, TableSchema.class);
- }
- }
-
List<BoundedSource<T>> createSources(
List<ResourceId> files, TableSchema schema, @Nullable List<MatchResult.Metadata> metadata)
throws IOException, InterruptedException {
+ String avroSchema =
+ BigQueryAvroUtils.toGenericAvroSchema("root", schema.getFields()).toString();
- final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(schema);
- SerializableFunction<GenericRecord, T> fnWrapper =
- new SerializableFunction<GenericRecord, T>() {
- private Supplier<TableSchema> schema =
- Suppliers.memoize(
- Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(jsonSchema)));
-
- @Override
- public T apply(GenericRecord input) {
- return parseFn.apply(new SchemaAndRecord(input, schema.get()));
- }
- };
+ AvroSource.DatumReaderFactory<T> factory = readerFactory.apply(schema);
List<BoundedSource<T>> avroSources = Lists.newArrayList();
// If metadata is available, create AvroSources with said metadata in SINGLE_FILE_OR_SUBRANGE
// mode.
if (metadata != null) {
for (MatchResult.Metadata file : metadata) {
- avroSources.add(AvroSource.from(file).withParseFn(fnWrapper, getOutputCoder()));
+ avroSources.add(
+ (AvroSource<T>)
+ AvroSource.from(file).withSchema(avroSchema).withDatumReaderFactory(factory));
}
} else {
for (ResourceId file : files) {
- avroSources.add(AvroSource.from(file.toString()).withParseFn(fnWrapper, getOutputCoder()));
+ avroSources.add(
+ (AvroSource<T>)
+ AvroSource.from(file.toString())
+ .withSchema(avroSchema)
+ .withDatumReaderFactory(factory));
}
}
return ImmutableList.copyOf(avroSources);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
index 83006181df1..9532a2f4d6f 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceDef.java
@@ -17,10 +17,12 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
+import com.google.api.services.bigquery.model.TableSchema;
import java.io.Serializable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -34,7 +36,7 @@ interface BigQuerySourceDef extends Serializable {
*
* @param stepUuid Job UUID
* @param coder Coder
- * @param parseFn Parse function
+ * @param readerFactory Reader factory
* @param useAvroLogicalTypes Use avro logical types i.e DATE, TIME
* @param <T> Type of the resulting PCollection
* @return An implementation of {@link BigQuerySourceBase}
@@ -42,7 +44,7 @@ interface BigQuerySourceDef extends Serializable {
<T> BigQuerySourceBase<T> toSource(
String stepUuid,
Coder<T> coder,
- SerializableFunction<SchemaAndRecord, T> parseFn,
+ SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory,
boolean useAvroLogicalTypes);
/**
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index f944ae673b6..2d274ed0e51 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -19,9 +19,11 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -38,10 +40,10 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
BigQueryTableSourceDef tableDef,
BigQueryServices bqServices,
Coder<T> coder,
- SerializableFunction<SchemaAndRecord, T> parseFn,
+ SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory,
boolean useAvroLogicalTypes) {
return new BigQueryTableSource<>(
- stepUuid, tableDef, bqServices, coder, parseFn, useAvroLogicalTypes);
+ stepUuid, tableDef, bqServices, coder, readerFactory, useAvroLogicalTypes);
}
private final BigQueryTableSourceDef tableDef;
@@ -52,9 +54,9 @@ class BigQueryTableSource<T> extends BigQuerySourceBase<T> {
BigQueryTableSourceDef tableDef,
BigQueryServices bqServices,
Coder<T> coder,
- SerializableFunction<SchemaAndRecord, T> parseFn,
+ SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory,
boolean useAvroLogicalTypes) {
- super(stepUuid, bqServices, coder, parseFn, useAvroLogicalTypes);
+ super(stepUuid, bqServices, coder, readerFactory, useAvroLogicalTypes);
this.tableDef = tableDef;
this.tableSizeBytes = new AtomicReference<>();
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
index 72de3f56e6c..e78ea0b5d7c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSourceDef.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.Schema;
@@ -95,10 +96,10 @@ class BigQueryTableSourceDef implements BigQuerySourceDef {
public <T> BigQuerySourceBase<T> toSource(
String stepUuid,
Coder<T> coder,
- SerializableFunction<SchemaAndRecord, T> parseFn,
+ SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<T>> readerFactory,
boolean useAvroLogicalTypes) {
return BigQueryTableSource.create(
- stepUuid, this, bqServices, coder, parseFn, useAvroLogicalTypes);
+ stepUuid, this, bqServices, coder, readerFactory, useAvroLogicalTypes);
}
/** {@inheritDoc} */
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index f2114c8eb07..75202b4076b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -40,11 +40,15 @@ import java.math.BigInteger;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecordBase;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.extensions.protobuf.ByteStringCoder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
+import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.JobType;
@@ -130,6 +134,21 @@ public class BigQueryIOReadTest implements Serializable {
.withDatasetService(fakeDatasetService)
.withJobService(fakeJobService);
+ private SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<TableRow>>
+ datumReaderFactoryFn =
+ (SerializableFunction<TableSchema, AvroSource.DatumReaderFactory<TableRow>>)
+ input -> {
+ try {
+ String jsonSchema = BigQueryIO.JSON_FACTORY.toString(input);
+ return (AvroSource.DatumReaderFactory<TableRow>)
+ (writer, reader) ->
+ new BigQueryIO.GenericDatumTransformer<>(
+ BigQueryIO.TableRowParser.INSTANCE, jsonSchema, writer, reader);
+ } catch (IOException e) {
+ return null;
+ }
+ };
+
private void checkSetsProject(String projectId) throws Exception {
fakeDatasetService.createDataset(projectId, "dataset-id", "", "", null);
String tableId = "sometable";
@@ -490,6 +509,107 @@ public class BigQueryIOReadTest implements Serializable {
p.run();
}
+ static class User extends SpecificRecordBase {
+ private static final org.apache.avro.Schema schema =
+ org.apache.avro.SchemaBuilder.record("User")
+ .namespace("org.apache.beam.sdk.io.gcp.bigquery.BigQueryIOReadTest$")
+ .fields()
+ .optionalString("name")
+ .endRecord();
+
+ private String name;
+
+ public String getName() {
+ return this.name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public User() {}
+
+ @Override
+ public void put(int i, Object v) {
+ if (i == 0) {
+ setName(((org.apache.avro.util.Utf8) v).toString());
+ }
+ }
+
+ @Override
+ public Object get(int i) {
+ if (i == 0) {
+ return getName();
+ }
+ return null;
+ }
+
+ @Override
+ public org.apache.avro.Schema getSchema() {
+ return schema;
+ }
+
+ public static org.apache.avro.Schema getAvroSchema() {
+ return schema;
+ }
+ }
+
+ @Test
+ public void testReadTableWithReaderDatumFactory() throws IOException, InterruptedException {
+ // setup
+ Table someTable = new Table();
+ someTable.setSchema(
+ new TableSchema()
+ .setFields(ImmutableList.of(new TableFieldSchema().setName("name").setType("STRING"))));
+ someTable.setTableReference(
+ new TableReference()
+ .setProjectId("non-executing-project")
+ .setDatasetId("schema_dataset")
+ .setTableId("schema_table"));
+ someTable.setNumBytes(1024L * 1024L);
+ FakeDatasetService fakeDatasetService = new FakeDatasetService();
+ fakeDatasetService.createDataset("non-executing-project", "schema_dataset", "", "", null);
+ fakeDatasetService.createTable(someTable);
+
+ List<TableRow> records =
+ Lists.newArrayList(
+ new TableRow().set("name", "a"),
+ new TableRow().set("name", "b"),
+ new TableRow().set("name", "c"),
+ new TableRow().set("name", "d"));
+
+ fakeDatasetService.insertAll(someTable.getTableReference(), records, null);
+
+ FakeBigQueryServices fakeBqServices =
+ new FakeBigQueryServices()
+ .withJobService(new FakeJobService())
+ .withDatasetService(fakeDatasetService);
+
+ BigQueryIO.TypedRead<User> read =
+ BigQueryIO.readWithDatumReader(
+ (AvroSource.DatumReaderFactory<User>)
+ (writer, reader) -> new SpecificDatumReader<>(User.getAvroSchema()))
+ .from("non-executing-project:schema_dataset.schema_table")
+ .withTestServices(fakeBqServices)
+ .withoutValidation()
+ .withCoder(SerializableCoder.of(User.class));
+
+ PCollection<User> bqRows = p.apply(read);
+
+ User a = new User();
+ a.setName("a");
+ User b = new User();
+ b.setName("b");
+ User c = new User();
+ c.setName("c");
+ User d = new User();
+ d.setName("d");
+
+ PAssert.that(bqRows).containsInAnyOrder(ImmutableList.of(a, b, c, d));
+
+ p.run();
+ }
+
@Test
public void testBuildSourceDisplayDataTable() {
String tableSpec = "project:dataset.tableid";
@@ -558,7 +678,7 @@ public class BigQueryIOReadTest implements Serializable {
String stepUuid = "testStepUuid";
BoundedSource<TableRow> bqSource =
BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table))
- .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
+ .toSource(stepUuid, TableRowJsonCoder.of(), datumReaderFactoryFn, false);
PipelineOptions options = PipelineOptionsFactory.create();
options.setTempLocation(testFolder.getRoot().getAbsolutePath());
@@ -607,7 +727,7 @@ public class BigQueryIOReadTest implements Serializable {
String stepUuid = "testStepUuid";
BoundedSource<TableRow> bqSource =
BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table))
- .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
+ .toSource(stepUuid, TableRowJsonCoder.of(), datumReaderFactoryFn, false);
PipelineOptions options = PipelineOptionsFactory.create();
@@ -645,7 +765,7 @@ public class BigQueryIOReadTest implements Serializable {
String stepUuid = "testStepUuid";
BoundedSource<TableRow> bqSource =
BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table))
- .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
+ .toSource(stepUuid, TableRowJsonCoder.of(), datumReaderFactoryFn, false);
PipelineOptions options = PipelineOptionsFactory.create();
@@ -676,7 +796,7 @@ public class BigQueryIOReadTest implements Serializable {
null,
null,
null)
- .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
+ .toSource(stepUuid, TableRowJsonCoder.of(), datumReaderFactoryFn, false);
fakeJobService.expectDryRunQuery(
bqOptions.getProject(),
@@ -752,7 +872,7 @@ public class BigQueryIOReadTest implements Serializable {
null,
null,
null)
- .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
+ .toSource(stepUuid, TableRowJsonCoder.of(), datumReaderFactoryFn, false);
options.setTempLocation(testFolder.getRoot().getAbsolutePath());
@@ -818,7 +938,7 @@ public class BigQueryIOReadTest implements Serializable {
null,
null,
null)
- .toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE, false);
+ .toSource(stepUuid, TableRowJsonCoder.of(), datumReaderFactoryFn, false);
options.setTempLocation(testFolder.getRoot().getAbsolutePath());